
基于 Python 与 Tushare 的技术实现方案
更新时间:2026 年 3 月 21 日
股市中有两种投资者:一种在股价暴涨后追高买入,最终站在山顶吹冷风;另一种则能在拉升前夕精准埋伏,享受主升浪带来的丰厚收益。两者的差距,往往就在于能否提前识别出那些即将进入加速上涨阶段的标的。
这种能力并非天赋异禀者的专利。通过量化分析,我们可以将市场经验转化为可执行的检测逻辑,让数据替我们在成千上万只股票中筛选出真正的机会。
本文将展示如何利用 Python 和 Tushare 构建一套完整的强势行情预警系统,从数据获取到信号生成,从指标计算到可视化输出,每一步都配有可运行的代码实现。
💡 想要快速上手?试试官方技能包
如果你希望跳过繁琐的环境配置,直接体验 Tushare 的强大功能,我们为你准备了一个更便捷的选择:
🎯 ClawHub 官方 Tushare Skills
📦 一键安装:https://clawhub.ai/lidayan/tushare-data这是由官方团队精心封装的技能包,将本文涉及的核心数据获取能力打包成开箱即用的工具。无论你是想快速验证策略思路,还是作为学习量化交易的起点,这个技能包都能让你在几分钟内开始实战。
当然,如果你想深入理解每一行代码背后的逻辑,或者需要高度定制化的功能,那么请继续阅读本文——我们将手把手带你从零构建一套完整的系统。
当一只股票进入主升浪时,市场会用自己的语言发出信号。这些信号并非玄学,而是可以被量化观测的客观现象:
成交量的爆发性增长
资金是市场最诚实的投票器。当成交量突然放大到日均水平的 1.5 倍甚至 2 倍以上时,意味着有大量新资金正在涌入。这种放量往往先于价格的大幅上涨,是最早期的预警信号之一。
均线系统的黄金排列
短期均线(5 日)上穿中期均线(20 日),中期均线上穿长期均线(60 日),三条均线呈现向上发散的态势。这种形态在技术分析中被称为“多头排列”,是趋势确立的重要标志。
价格对压力位的突破
布林带上轨是一个动态的压力位,代表价格的统计学上限。当股价有效突破这条上轨时,往往意味着市场情绪已经突破理性区间,进入加速上涨的“非理性繁荣”阶段。
持续性的价格上涨
单日暴涨可能是偶然,但连续 3-5 天的稳步攀升,尤其是在量能配合的情况下,则是趋势形成的确认信号。
理解了这些市场语言,我们就可以着手将其转化为计算机可识别的量化指标。
在量化交易的世界里,数据就是弹药。Tushare 作为国内最完善的金融数据平台,提供了超过 220 个数据接口,覆盖股票、基金、期货、宏观经济等各个维度。
第一步:安装必要的工具包
# 核心依赖安装
pip install tushare pandas numpy matplotlib
# 如果需要更高级的可视化
pip install plotly mplfinance
第二步:获取并配置 Token
访问 Tushare 官网 注册账号,在个人中心获取 Token。这是访问数据接口的唯一凭证。
# Linux/Mac环境变量设置
echo'export TUSHARE_TOKEN="你的Token"' >> ~/.bashrc
source ~/.bashrc
# Windows环境变量设置
set TUSHARE_TOKEN "你的Token"
第三步:验证连接
import tushare as ts
import os
# 初始化连接
token = os.getenv('TUSHARE_TOKEN')
pro = ts.pro_api(token)
# 测试接口
df = pro.stock_basic(exchange='', list_status='L', fields='ts_code,name')
print(f"成功连接!当前A股上市公司数量:{len(df)}")
一个健壮的数据获取模块需要考虑异常处理、数据清洗和格式转换:
import pandas as pd
from datetime import datetime, timedelta
from typing importOptional
classStockDataFetcher:
"""股票数据获取器"""
def__init__(self, token: str):
"""
初始化数据获取器
Args:
token: Tushare API Token
"""self.pro = ts.pro_api(token)
defget_daily_data(self,
ts_code: str,
start_date: str,
end_date: str,
adjust: str = 'qfq') -> Optional[pd.DataFrame]:"""
获取股票日线数据
Args:
ts_code: 股票代码(如'000001.SZ')
start_date: 开始日期(格式:YYYYMMDD)
end_date: 结束日期(格式:YYYYMMDD)
adjust: 复权类型('qfq'前复权,'hfq'后复权,None不复权)
Returns:
包含OHLCV数据的DataFrame,失败返回None
"""try:
# 获取原始数据
df = self.pro.daily(
ts_code=ts_code,
start_date=start_date,
end_date=end_date
)
if df.empty:
print(f"警告:{ts_code} 在指定日期范围内无数据")
returnNone
# 数据清洗与格式化
df = df.sort_values('trade_date').reset_index(drop=True)
df['trade_date'] = pd.to_datetime(df['trade_date'])
# 处理复权(如果需要)
if adjust:
df = self._apply_adjust(df, ts_code, adjust)
# 重命名列,使其更符合直觉
df = df.rename(columns={
'trade_date': 'date',
'vol': 'volume',
'amount': 'turnover'
})
return df
except Exception as e:
print(f"数据获取失败:{ts_code} - {str(e)}")
returnNone
def_apply_adjust(self, df: pd.DataFrame,
ts_code: str,
adjust_type: str) -> pd.DataFrame:"""
应用复权因子
注:这里简化处理,实际应用中需要获取复权因子进行计算
"""# 实际项目中应该调用 pro.adj_factor() 获取复权因子
# 这里为演示简化处理
return df
defget_realtime_quote(self, ts_code: str) -> Optional[dict]:
"""
获取实时行情(需要更高级别权限)
Args:
ts_code: 股票代码
Returns:
包含实时价格、涨跌幅等信息的字典
"""try:
df = self.pro.daily(ts_code=ts_code,
trade_date=datetime.now().strftime('%Y%m%d'))
ifnot df.empty:
return df.iloc[0].to_dict()
returnNone
except:
returnNone
# 使用示例
fetcher = StockDataFetcher(token=os.getenv('TUSHARE_TOKEN'))
# 获取平安银行近一年数据
end_date = datetime.now().strftime('%Y%m%d')
start_date = (datetime.now() - timedelta(days=365)).strftime('%Y%m%d')
stock_data = fetcher.get_daily_data('000001.SZ', start_date, end_date)
if stock_data isnotNone:
print(f"
数据获取成功!共{len(stock_data)}个交易日")print("
最近5日行情:")print(stock_data[['date', 'close', 'volume', 'pct_chg']].tail())
数据样本输出:
数据获取成功!共243个交易日
最近5日行情:
date close volume pct_chg
238 2026-03-17 11.35 85000.0 1.42
239 2026-03-18 11.40 72000.0 0.44
240 2026-03-19 11.48 68000.0 0.70
241 2026-03-20 11.55 75000.0 0.61
242 2026-03-21 11.62 82000.0 0.61
技术指标是将价格和成交量转化为可判断信号的数学工具。一个完善的指标计算引擎需要模块化设计,便于扩展和维护:
import numpy as np
from typing importTuple
classTechnicalIndicators:
"""技术指标计算引擎"""
@staticmethod
defmoving_average(series: pd.Series, window: int) -> pd.Series:
"""
简单移动平均线(SMA)
Args:
series: 价格序列
window: 窗口期
Returns:
移动平均值序列
"""return series.rolling(window=window, min_periods=window).mean()
@staticmethod
defexponential_moving_average(series: pd.Series, span: int) -> pd.Series:
"""
指数移动平均线(EMA)
Args:
series: 价格序列
span: 跨度参数
Returns:
指数移动平均值序列
"""return series.ewm(span=span, adjust=False).mean()
@staticmethod
defbollinger_bands(series: pd.Series,
window: int = 20,
num_std: float = 2.0) -> Tuple[pd.Series, pd.Series, pd.Series]:"""
布林带指标
Args:
series: 价格序列
window: 计算窗口
num_std: 标准差倍数
Returns:
(上轨, 中轨, 下轨)
"""middle = series.rolling(window=window).mean()
std = series.rolling(window=window).std()
upper = middle + (std * num_std)
lower = middle - (std * num_std)
return upper, middle, lower
@staticmethod
defvolume_ratio(volume: pd.Series, window: int = 20) -> pd.Series:
"""
量比指标(当前成交量/均量)
Args:
volume: 成交量序列
window: 均量计算窗口
Returns:
量比序列
"""avg_volume = volume.rolling(window=window).mean()
return volume / avg_volume
@staticmethod
defmacd(series: pd.Series,
fast: int = 12,
slow: int = 26,
signal: int = 9) -> Tuple[pd.Series, pd.Series, pd.Series]:"""
MACD指标
Args:
series: 价格序列
fast: 快线周期
slow: 慢线周期
signal: 信号线周期
Returns:
(DIF, DEA, MACD柱)
"""ema_fast = series.ewm(span=fast, adjust=False).mean()
ema_slow = series.ewm(span=slow, adjust=False).mean()
dif = ema_fast - ema_slow
dea = dif.ewm(span=signal, adjust=False).mean()
macd_bar = (dif - dea) * 2
return dif, dea, macd_bar
@staticmethod
defrsi(series: pd.Series, period: int = 14) -> pd.Series:
"""
相对强弱指标(RSI)
Args:
series: 价格序列
period: 计算周期
Returns:
RSI值序列(0-100)
"""delta = series.diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
avg_gain = gain.rolling(window=period).mean()
avg_loss = loss.rolling(window=period).mean()
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
return rsi
@staticmethod
defatr(high: pd.Series,
low: pd.Series,
close: pd.Series,
period: int = 14) -> pd.Series:"""
平均真实波幅(ATR)
Args:
high: 最高价序列
low: 最低价序列
close: 收盘价序列
period: 计算周期
Returns:
ATR值序列
"""high_low = high - low
high_close = np.abs(high - close.shift())
low_close = np.abs(low - close.shift())
true_range = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
atr = true_range.rolling(window=period).mean()
return atr
classStockAnalyzer:
"""股票技术分析器"""
def__init__(self, data: pd.DataFrame):
"""
初始化分析器
Args:
data: 包含OHLCV的股票数据
"""self.data = data.copy()
self.indicators = TechnicalIndicators()
defcalculate_all_indicators(self) -> pd.DataFrame:
"""
计算所有技术指标
Returns:
添加了技术指标列的DataFrame
"""df = self.data
# 均线系统
df['ma5'] = self.indicators.moving_average(df['close'], 5)
df['ma10'] = self.indicators.moving_average(df['close'], 10)
df['ma20'] = self.indicators.moving_average(df['close'], 20)
df['ma60'] = self.indicators.moving_average(df['close'], 60)
# 成交量指标
df['vol_ma20'] = self.indicators.moving_average(df['volume'], 20)
df['vol_ratio'] = self.indicators.volume_ratio(df['volume'], 20)
# 布林带
df['bb_upper'], df['bb_middle'], df['bb_lower'] = \
self.indicators.bollinger_bands(df['close'], 20, 2.0)
# MACD
df['macd_dif'], df['macd_dea'], df['macd_bar'] = \
self.indicators.macd(df['close'])
# RSI
df['rsi'] = self.indicators.rsi(df['close'], 14)
# ATR(用于止损计算)
df['atr'] = self.indicators.atr(df['high'], df['low'], df['close'], 14)
# 派生指标
df['price_position'] = (df['close'] - df['bb_lower']) / \
(df['bb_upper'] - df['bb_lower'])
df['ma_trend'] = (df['ma5'] > df['ma20']) & (df['ma20'] > df['ma60'])
df['price_change_3d'] = df['close'].pct_change(periods=3) * 100
return df.dropna()
# 使用示例
analyzer = StockAnalyzer(stock_data)
enriched_data = analyzer.calculate_all_indicators()
print("
技术指标计算完成!")print(f"指标列数:{len(enriched_data.columns)}")
print("
最新指标快照:")print(enriched_data[['date', 'close', 'ma5', 'ma20', 'vol_ratio', 'rsi']].tail(3))
不同的技术指标从不同维度刻画市场状态:
| 趋势类 | |||
| 动量类 | |||
| 波动类 | |||
| 量能类 | |||
| 综合类 |
单一指标容易产生虚假信号,真正有效的检测系统需要多个维度的共振确认:
from dataclasses import dataclass
from typing importList, Dict
from enum import Enum
classSignalStrength(Enum):
"""信号强度等级"""
WEAK = 1# 弱信号
MODERATE = 2# 中等信号
STRONG = 3# 强信号
@dataclass
classTradingSignal:
"""交易信号数据类"""
date: datetime
ts_code: str
signal_type: str
strength: SignalStrength
price: float
indicators: Dict[str, float]
description: str
classSignalDetector:
"""强势行情信号检测器"""
def__init__(self, data: pd.DataFrame, config: dict = None):
"""
初始化检测器
Args:
data: 包含技术指标的股票数据
config: 检测参数配置
"""self.data = data
self.config = config orself._default_config()
@staticmethod
def_default_config() -> dict:
"""默认检测参数"""
return {
'vol_ratio_threshold': 1.5, # 量比阈值
'price_change_threshold': 5.0, # 3日涨幅阈值(%)
'rsi_range': (40, 80), # RSI有效区间
'bb_breakout': True, # 是否要求突破布林上轨
'ma_alignment': True, # 是否要求均线多头
'min_signal_gap': 3# 信号最小间隔(天)
}
defdetect_main_uplift(self) -> pd.DataFrame:
"""
检测主拉升信号
Returns:
添加了信号标记的DataFrame
"""df = self.data.copy()
cfg = self.config
# 基础条件组合
volume_surge = df['vol_ratio'] > cfg['vol_ratio_threshold']
price_momentum = df['price_change_3d'] > cfg['price_change_threshold']
price_above_ma5 = df['close'] > df['ma5']
ma5_above_ma20 = df['ma5'] > df['ma20']
# RSI过滤(避免超买区)
rsi_valid = (df['rsi'] > cfg['rsi_range'][0]) & \
(df['rsi'] < cfg['rsi_range'][1])
# 组合基础信号
basic_signal = (
volume_surge &
price_momentum &
price_above_ma5 &
ma5_above_ma20 &
rsi_valid
)
# 可选的严格条件
if cfg['bb_breakout']:
bb_break = df['close'] > df['bb_upper']
basic_signal = basic_signal & bb_break
if cfg['ma_alignment']:
ma_trend = df['ma_trend']
basic_signal = basic_signal & ma_trend
# 信号去重(避免连续触发)
df['raw_signal'] = basic_signal
df['signal'] = self._filter_consecutive_signals(
df['raw_signal'],
cfg['min_signal_gap']
)
# 计算信号强度
df['signal_strength'] = df.apply(
lambda row: self._calculate_signal_strength(row) if row['signal'] else0,
axis=1
)
return df
@staticmethod
def_filter_consecutive_signals(signal_series: pd.Series,
min_gap: int) -> pd.Series:"""
过滤连续信号,保持最小间隔
Args:
signal_series: 原始信号序列
min_gap: 最小间隔天数
Returns:
过滤后的信号序列
"""filtered = signal_series.copy()
last_signal_idx = -min_gap
for idx in signal_series[signal_series].index:
if idx - last_signal_idx < min_gap:
filtered.loc[idx] = False
else:
last_signal_idx = idx
return filtered
def_calculate_signal_strength(self, row: pd.Series) -> int:
"""
计算信号强度评分
评分规则:
- 量比越大,得分越高
- 涨幅越大,得分越高
- RSI在50-70之间得分最高
- 突破布林上轨加分
Returns:
1-3的强度等级
"""score = 0
# 量能评分(0-3分)
if row['vol_ratio'] > 2.5:
score += 3
elif row['vol_ratio'] > 2.0:
score += 2
elif row['vol_ratio'] > 1.5:
score += 1
# 涨幅评分(0-3分)
if row['price_change_3d'] > 10:
score += 3
elif row['price_change_3d'] > 7:
score += 2
elif row['price_change_3d'] > 5:
score += 1
# RSI评分(0-2分)
if50 <= row['rsi'] <= 70:
score += 2
elif40 <= row['rsi'] < 50or70 < row['rsi'] <= 80:
score += 1
# 布林带突破加分
if row['close'] > row['bb_upper']:
score += 2
# 转换为强度等级
if score >= 8:
return SignalStrength.STRONG.value
elif score >= 5:
return SignalStrength.MODERATE.value
else:
return SignalStrength.WEAK.value
defgenerate_signal_report(self) -> List[TradingSignal]:
"""
生成信号报告
Returns:
交易信号对象列表
"""df = self.detect_main_uplift()
signals = []
for idx, row in df[df['signal']].iterrows():
signal = TradingSignal(
date=row['date'],
ts_code=row.get('ts_code', 'Unknown'),
signal_type='主拉升',
strength=SignalStrength(row['signal_strength']),
price=row['close'],
indicators={
'vol_ratio': round(row['vol_ratio'], 2),
'price_change_3d': round(row['price_change_3d'], 2),
'rsi': round(row['rsi'], 2),
'ma5': round(row['ma5'], 2),
'ma20': round(row['ma20'], 2)
},
description=self._generate_description(row)
)
signals.append(signal)
return signals
@staticmethod
def_generate_description(row: pd.Series) -> str:
"""生成信号描述文本"""
desc_parts = []
if row['vol_ratio'] > 2.0:
desc_parts.append(f"放量{row['vol_ratio']:.1f}倍")
if row['price_change_3d'] > 7:
desc_parts.append(f"3日涨{row['price_change_3d']:.1f}%")
if row['close'] > row['bb_upper']:
desc_parts.append("突破布林上轨")
if row['ma_trend']:
desc_parts.append("均线多头排列")
return",".join(desc_parts) if desc_parts else"标准信号"
# 使用示例
detector = SignalDetector(enriched_data)
result_df = detector.detect_main_uplift()
signals = detector.generate_signal_report()
print(f"
检测到 {len(signals)} 个主拉升信号
")for sig in signals[-5:]: # 显示最近5个信号
print(f"日期:{sig.date.strftime('%Y-%m-%d')}")
print(f"价格:{sig.price:.2f} | 强度:{sig.strength.name}")
print(f"特征:{sig.description}")
print(f"指标:{sig.indicators}")
print("-" * 50)
数据可视化不仅是为了美观,更是为了快速识别模式和异常:
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.patches import Rectangle
import mplfinance as mpf
classChartGenerator:
"""图表生成器"""
def__init__(self, data: pd.DataFrame, signals: pd.DataFrame = None):
"""
初始化图表生成器
Args:
data: 股票数据
signals: 信号数据
"""self.data = data
self.signals = signals
self._setup_style()
@staticmethod
def_setup_style():
"""设置绘图样式"""
plt.style.use('seaborn-v0_8-darkgrid')
plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['figure.facecolor'] = 'white'
defplot_comprehensive_analysis(self,
stock_name: str,
ts_code: str,
save_path: str = None) -> plt.Figure:"""
绘制综合分析图表
包含:价格走势、均线、布林带、成交量、MACD、RSI
Args:
stock_name: 股票名称
ts_code: 股票代码
save_path: 保存路径
Returns:
matplotlib Figure对象
"""fig = plt.figure(figsize=(16, 12))
gs = fig.add_gridspec(4, 1, height_ratios=[3, 1, 1, 1], hspace=0.3)
# 子图1:价格与均线
ax1 = fig.add_subplot(gs[0])
self._plot_price_and_ma(ax1, stock_name, ts_code)
# 子图2:成交量
ax2 = fig.add_subplot(gs[1], sharex=ax1)
self._plot_volume(ax2)
# 子图3:MACD
ax3 = fig.add_subplot(gs[2], sharex=ax1)
self._plot_macd(ax3)
# 子图4:RSI
ax4 = fig.add_subplot(gs[3], sharex=ax1)
self._plot_rsi(ax4)
# 隐藏中间子图的x轴标签
for ax in [ax1, ax2, ax3]:
plt.setp(ax.get_xticklabels(), visible=False)
# 格式化x轴日期
ax4.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
ax4.xaxis.set_major_locator(mdates.MonthLocator())
plt.setp(ax4.xaxis.get_majorticklabels(), rotation=45, ha='right')
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=150, bbox_inches='tight')
print(f"图表已保存至:{save_path}")
return fig
def_plot_price_and_ma(self, ax, stock_name: str, ts_code: str):
"""绘制价格与均线"""
df = self.data
# 绘制价格线
ax.plot(df['date'], df['close'], label='收盘价',
linewidth=2, color='#2E86AB', zorder=3)
# 绘制均线
ax.plot(df['date'], df['ma5'], label='MA5',
linewidth=1.5, alpha=0.8, color='#A23B72')
ax.plot(df['date'], df['ma20'], label='MA20',
linewidth=1.5, alpha=0.8, color='#F18F01')
ax.plot(df['date'], df['ma60'], label='MA60',
linewidth=1.5, alpha=0.8, color='#C73E1D')
# 绘制布林带
ax.fill_between(df['date'], df['bb_lower'], df['bb_upper'],
alpha=0.1, color='gray', label='布林带')
ax.plot(df['date'], df['bb_upper'], '--',
linewidth=1, alpha=0.5, color='gray')
ax.plot(df['date'], df['bb_lower'], '--',
linewidth=1, alpha=0.5, color='gray')
# 标记信号点
ifself.signals isnotNone:
signal_df = df[df['signal']]
# 按强度分类标记
strong = signal_df[signal_df['signal_strength'] == 3]
moderate = signal_df[signal_df['signal_strength'] == 2]
weak = signal_df[signal_df['signal_strength'] == 1]
ifnot strong.empty:
ax.scatter(strong['date'], strong['close'],
color='red', marker='^', s=150,
label='强信号', zorder=5, edgecolors='darkred', linewidths=1.5)
ifnot moderate.empty:
ax.scatter(moderate['date'], moderate['close'],
color='orange', marker='^', s=100,
label='中等信号', zorder=4, edgecolors='darkorange', linewidths=1)
ifnot weak.empty:
ax.scatter(weak['date'], weak['close'],
color='yellow', marker='^', s=70,
label='弱信号', zorder=3, edgecolors='gold', linewidths=0.8)
ax.set_title(f'{stock_name}({ts_code}) - 强势行情分析',
fontsize=16, fontweight='bold', pad=15)
ax.set_ylabel('价格(元)', fontsize=12)
ax.legend(loc='upper left', framealpha=0.9, fontsize=10)
ax.grid(True, alpha=0.3, linestyle='--')
def_plot_volume(self, ax):
"""绘制成交量"""
df = self.data
# 根据涨跌着色
colors = ['#EF476F'if df['close'].iloc[i] >= df['open'].iloc[i]
else'#06D6A0'for i inrange(len(df))]
ax.bar(df['date'], df['volume'], color=colors, alpha=0.7, width=0.8)
ax.plot(df['date'], df['vol_ma20'], color='#118AB2',
linewidth=2, label='量均线')
# 标记放量区域
high_vol = df[df['vol_ratio'] > 1.5]
ifnot high_vol.empty:
ax.scatter(high_vol['date'], high_vol['volume'],
color='red', marker='*', s=80,
label='放量', zorder=5, alpha=0.8)
ax.set_ylabel('成交量', fontsize=12)
ax.legend(loc='upper left', fontsize=10)
ax.grid(True, alpha=0.3, linestyle='--')
def_plot_macd(self, ax):
"""绘制MACD"""
df = self.data
# MACD柱状图
colors = ['#EF476F'if val >= 0else'#06D6A0'
for val in df['macd_bar']]
ax.bar(df['date'], df['macd_bar'], color=colors, alpha=0.6, width=0.8)
# DIF和DEA线
ax.plot(df['date'], df['macd_dif'], label='DIF',
linewidth=1.5, color='#2E86AB')
ax.plot(df['date'], df['macd_dea'], label='DEA',
linewidth=1.5, color='#F18F01')
# 零轴线
ax.axhline(y=0, color='gray', linestyle='-', linewidth=0.8, alpha=0.5)
ax.set_ylabel('MACD', fontsize=12)
ax.legend(loc='upper left', fontsize=10)
ax.grid(True, alpha=0.3, linestyle='--')
def_plot_rsi(self, ax):
"""绘制RSI"""
df = self.data
ax.plot(df['date'], df['rsi'], linewidth=2, color='#A23B72')
# 超买超卖区域
ax.axhline(y=70, color='red', linestyle='--',
linewidth=1, alpha=0.5, label='超买线')
ax.axhline(y=30, color='green', linestyle='--',
linewidth=1, alpha=0.5, label='超卖线')
ax.axhline(y=50, color='gray', linestyle='-',
linewidth=0.8, alpha=0.3)
# 填充区域
ax.fill_between(df['date'], 70, 100, alpha=0.1, color='red')
ax.fill_between(df['date'], 0, 30, alpha=0.1, color='green')
ax.set_ylabel('RSI', fontsize=12)
ax.set_ylim(0, 100)
ax.legend(loc='upper left', fontsize=10)
ax.grid(True, alpha=0.3, linestyle='--')
ax.set_xlabel('日期', fontsize=12)
# 使用示例
chart_gen = ChartGenerator(result_df, result_df)
fig = chart_gen.plot_comprehensive_analysis('平安银行', '000001.SZ',
save_path='analysis_chart.png')
plt.show()
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import time
classStockScreener:
"""股票筛选器"""
def__init__(self, fetcher: StockDataFetcher):
"""
初始化筛选器
Args:
fetcher: 数据获取器实例
"""self.fetcher = fetcher
self.results = []
defscan_stock_pool(self,
stock_pool: Dict[str, str],
start_date: str,
end_date: str,
max_workers: int = 5) -> pd.DataFrame:"""
批量扫描股票池
Args:
stock_pool: {股票代码: 股票名称}字典
start_date: 开始日期
end_date: 结束日期
max_workers: 并发线程数
Returns:
包含所有股票分析结果的DataFrame
"""print(f"开始扫描{len(stock_pool)}只股票...")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交任务
future_to_stock = {
executor.submit(
self._analyze_single_stock,
code, name, start_date, end_date
): (code, name)
for code, name in stock_pool.items()
}
# 收集结果
for future in tqdm(as_completed(future_to_stock),
total=len(stock_pool),
desc="分析进度"):
code, name = future_to_stock[future]
try:
result = future.result()
if result:
self.results.append(result)
except Exception as e:
print(f"
{name}({code})分析失败:{str(e)}")# 转换为DataFrame
ifself.results:
df = pd.DataFrame(self.results)
df = df.sort_values('signal_strength', ascending=False)
return df
else:
return pd.DataFrame()
def_analyze_single_stock(self,
ts_code: str,
name: str,
start_date: str,
end_date: str) -> Dict:"""
分析单只股票
Returns:
包含分析结果的字典
"""# 获取数据
data = self.fetcher.get_daily_data(ts_code, start_date, end_date)
if data isNoneorlen(data) < 60:
returnNone
# 计算指标
analyzer = StockAnalyzer(data)
enriched = analyzer.calculate_all_indicators()
# 检测信号
detector = SignalDetector(enriched)
result_df = detector.detect_main_uplift()
# 提取最新状态
latest = result_df.iloc[-1]
signal_count = result_df['signal'].sum()
# 计算额外指标
recent_return = (latest['close'] / result_df.iloc[-20]['close'] - 1) * 100
return {
'ts_code': ts_code,
'name': name,
'latest_price': latest['close'],
'latest_date': latest['date'].strftime('%Y-%m-%d'),
'signal_count': signal_count,
'has_current_signal': latest['signal'],
'signal_strength': latest['signal_strength'] if latest['signal'] else0,
'vol_ratio': latest['vol_ratio'],
'rsi': latest['rsi'],
'price_change_3d': latest['price_change_3d'],
'return_20d': recent_return,
'ma_trend': latest['ma_trend']
}
deffilter_strong_stocks(self,
min_strength: int = 2,
min_vol_ratio: float = 1.3) -> pd.DataFrame:"""
筛选强势股票
Args:
min_strength: 最小信号强度
min_vol_ratio: 最小量比
Returns:
符合条件的股票DataFrame
"""ifnotself.results:
return pd.DataFrame()
df = pd.DataFrame(self.results)
strong_stocks = df[
(df['signal_strength'] >= min_strength) &
(df['vol_ratio'] >= min_vol_ratio) &
(df['has_current_signal'] == True)
]
return strong_stocks.sort_values('signal_strength', ascending=False)
# 实战示例:扫描科技股
tech_stocks = {
'000001.SZ': '平安银行',
'000002.SZ': '万科A',
'600519.SH': '贵州茅台',
'601318.SH': '中国平安',
'000858.SZ': '五粮液',
'600036.SH': '招商银行',
'601166.SH': '兴业银行',
'000333.SZ': '美的集团',
'600030.SH': '中信证券'
}
# 初始化筛选器
screener = StockScreener(fetcher)
# 批量扫描
end_date = datetime.now().strftime('%Y%m%d')
start_date = (datetime.now() - timedelta(days=180)).strftime('%Y%m%d')
results_df = screener.scan_stock_pool(tech_stocks, start_date, end_date)
# 筛选强势股
strong_stocks = screener.filter_strong_stocks(min_strength=2, min_vol_ratio=1.5)
print("
=== 强势股票榜单 ===")print(strong_stocks[['name', 'latest_price', 'signal_strength',
'vol_ratio', 'price_change_3d', 'return_20d']])
# 导出结果
results_df.to_csv('stock_scan_results.csv', index=False, encoding='utf-8-sig')
print("
完整结果已导出至:stock_scan_results.csv")
classRiskManager:
"""风险管理模块"""
@staticmethod
defcalculate_dynamic_stop_loss(data: pd.DataFrame,
method: str = 'atr',
multiplier: float = 2.0) -> pd.Series:"""
计算动态止损位
Args:
data: 股票数据
method: 止损方法('atr', 'percentage', 'support')
multiplier: ATR倍数或百分比
Returns:
止损价格序列
"""if method == 'atr':
# 基于ATR的止损
return data['close'] - multiplier * data['atr']
elif method == 'percentage':
# 固定百分比止损
return data['close'] * (1 - multiplier / 100)
elif method == 'support':
# 基于支撑位的止损(MA20)
return data['ma20'] * 0.95
else:
raise ValueError(f"未知的止损方法:{method}")
@staticmethod
defcalculate_position_size(account_value: float,
risk_per_trade: float,
entry_price: float,
stop_loss: float) -> int:"""
计算仓位大小
Args:
account_value: 账户总值
risk_per_trade: 单笔交易风险比例(如0.02表示2%)
entry_price: 入场价格
stop_loss: 止损价格
Returns:
建议买入股数
"""risk_amount = account_value * risk_per_trade
risk_per_share = entry_price - stop_loss
if risk_per_share <= 0:
return0
shares = int(risk_amount / risk_per_share)
# 确保不超过账户的50%
max_shares = int((account_value * 0.5) / entry_price)
returnmin(shares, max_shares)
@staticmethod
defcalculate_risk_reward_ratio(entry: float,
stop_loss: float,
target: float) -> float:"""
计算风险收益比
Args:
entry: 入场价
stop_loss: 止损价
target: 目标价
Returns:
风险收益比(>2为佳)
"""risk = entry - stop_loss
reward = target - entry
if risk <= 0:
return0
return reward / risk
# 使用示例
risk_mgr = RiskManager()
# 为分析结果添加止损位
result_df['stop_loss_atr'] = risk_mgr.calculate_dynamic_stop_loss(
result_df, method='atr', multiplier=2.0
)
result_df['stop_loss_pct'] = risk_mgr.calculate_dynamic_stop_loss(
result_df, method='percentage', multiplier=5.0
)
# 计算建议仓位
account_value = 100000# 10万账户
latest = result_df.iloc[-1]
position_size = risk_mgr.calculate_position_size(
account_value=account_value,
risk_per_trade=0.02, # 单笔风险2%
entry_price=latest['close'],
stop_loss=latest['stop_loss_atr']
)
print(f"
=== 风险管理建议 ===")print(f"当前价格:{latest['close']:.2f}")
print(f"ATR止损位:{latest['stop_loss_atr']:.2f}")
print(f"百分比止损位:{latest['stop_loss_pct']:.2f}")
print(f"建议仓位:{position_size}股")
print(f"建议投入:{position_size * latest['close']:.2f}元")
classSimpleBacktester:
"""简易回测引擎"""
def__init__(self, initial_capital: float = 100000):
"""
初始化回测引擎
Args:
initial_capital: 初始资金
"""self.initial_capital = initial_capital
self.capital = initial_capital
self.positions = {}
self.trades = []
self.equity_curve = []
defrun_backtest(self, data: pd.DataFrame) -> Dict:
"""
运行回测
Args:
data: 包含信号的股票数据
Returns:
回测统计结果
"""for idx, row in data.iterrows():
date = row['date']
# 检查买入信号
if row['signal'] andnotself.positions:
self._open_position(row)
# 检查卖出信号(这里简化为持有5天)
elifself.positions:
entry_date = self.positions['entry_date']
hold_days = (date - entry_date).days
# 止损或止盈
if row['close'] <= self.positions['stop_loss']:
self._close_position(row, reason='止损')
elif row['close'] >= self.positions['target']:
self._close_position(row, reason='止盈')
elif hold_days >= 5:
self._close_position(row, reason='持有期满')
# 记录权益曲线
equity = self._calculate_equity(row)
self.equity_curve.append({
'date': date,
'equity': equity
})
returnself._calculate_statistics()
def_open_position(self, row: pd.Series):
"""开仓"""
entry_price = row['close']
stop_loss = row['stop_loss_atr']
target = entry_price * 1.10# 10%目标收益
shares = int((self.capital * 0.3) / entry_price) # 30%仓位
cost = shares * entry_price
self.positions = {
'entry_date': row['date'],
'entry_price': entry_price,
'shares': shares,
'cost': cost,
'stop_loss': stop_loss,
'target': target
}
self.capital -= cost
def_close_position(self, row: pd.Series, reason: str):
"""平仓"""
exit_price = row['close']
shares = self.positions['shares']
proceeds = shares * exit_price
profit = proceeds - self.positions['cost']
profit_pct = (profit / self.positions['cost']) * 100
self.trades.append({
'entry_date': self.positions['entry_date'],
'exit_date': row['date'],
'entry_price': self.positions['entry_price'],
'exit_price': exit_price,
'shares': shares,
'profit': profit,
'profit_pct': profit_pct,
'reason': reason
})
self.capital += proceeds
self.positions = {}
def_calculate_equity(self, row: pd.Series) -> float:
"""计算当前权益"""
ifself.positions:
market_value = self.positions['shares'] * row['close']
returnself.capital + market_value
returnself.capital
def_calculate_statistics(self) -> Dict:
"""计算回测统计"""
ifnotself.trades:
return {'error': '无交易记录'}
trades_df = pd.DataFrame(self.trades)
total_return = (self.capital - self.initial_capital) / self.initial_capital * 100
win_rate = len(trades_df[trades_df['profit'] > 0]) / len(trades_df) * 100
avg_profit = trades_df['profit'].mean()
max_profit = trades_df['profit'].max()
max_loss = trades_df['profit'].min()
return {
'total_trades': len(trades_df),
'win_rate': win_rate,
'total_return': total_return,
'final_capital': self.capital,
'avg_profit': avg_profit,
'max_profit': max_profit,
'max_loss': max_loss,
'trades': trades_df
}
# 回测示例
backtester = SimpleBacktester(initial_capital=100000)
backtest_results = backtester.run_backtest(result_df)
print("
=== 回测结果 ===")print(f"总交易次数:{backtest_results['total_trades']}")
print(f"胜率:{backtest_results['win_rate']:.2f}%")
print(f"总收益率:{backtest_results['total_return']:.2f}%")
print(f"最终资金:{backtest_results['final_capital']:.2f}元")
print(f"平均盈亏:{backtest_results['avg_profit']:.2f}元")
#!/usr/bin/env python3
"""
主拉升行情检测系统 - 完整执行脚本
"""import os
import sys
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
defmain():
"""主执行函数"""
print("=" * 60)
print("股票主拉升行情检测系统 v1.0")
print("=" * 60)
# 1. 环境检查
token = os.getenv('TUSHARE_TOKEN')
ifnot token:
print("错误:未设置TUSHARE_TOKEN环境变量")
sys.exit(1)
# 2. 参数配置
config = {
'stock_pool': {
'000001.SZ': '平安银行',
'600519.SH': '贵州茅台',
'601318.SH': '中国平安'
},
'lookback_days': 180,
'detection_config': {
'vol_ratio_threshold': 1.5,
'price_change_threshold': 5.0,
'rsi_range': (40, 80),
'bb_breakout': False, # 宽松模式
'ma_alignment': True,
'min_signal_gap': 3
}
}
# 3. 初始化组件
print("
[1/5] 初始化数据获取器...")fetcher = StockDataFetcher(token)
print("[2/5] 批量获取股票数据...")
end_date = datetime.now().strftime('%Y%m%d')
start_date = (datetime.now() - timedelta(days=config['lookback_days'])).strftime('%Y%m%d')
screener = StockScreener(fetcher)
results_df = screener.scan_stock_pool(
config['stock_pool'],
start_date,
end_date
)
# 4. 筛选强势股
print("
[3/5] 筛选强势股票...")strong_stocks = screener.filter_strong_stocks(min_strength=2, min_vol_ratio=1.3)
if strong_stocks.empty:
print("未发现符合条件的强势股票")
else:
print(f"
发现 {len(strong_stocks)} 只强势股票:")print(strong_stocks[['name', 'latest_price', 'signal_strength',
'vol_ratio', 'price_change_3d']])
# 5. 生成详细报告
print("
[4/5] 生成可视化报告...")for _, stock in strong_stocks.head(3).iterrows():
ts_code = stock['ts_code']
name = stock['name']
# 重新获取完整数据
data = fetcher.get_daily_data(ts_code, start_date, end_date)
analyzer = StockAnalyzer(data)
enriched = analyzer.calculate_all_indicators()
detector = SignalDetector(enriched, config['detection_config'])
result = detector.detect_main_uplift()
# 生成图表
chart_gen = ChartGenerator(result, result)
chart_gen.plot_comprehensive_analysis(
name, ts_code,
save_path=f'reports/{ts_code}_analysis.png'
)
# 6. 导出结果
print("
[5/5] 导出结果...")timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
results_df.to_csv(f'reports/scan_results_{timestamp}.csv',
index=False, encoding='utf-8-sig')
print("
" + "=" * 60)print("分析完成!")
print(f"结果已保存至 reports/ 目录")
print("=" * 60)
if __name__ == '__main__':
# 创建输出目录
os.makedirs('reports', exist_ok=True)
# 执行主程序
try:
main()
except KeyboardInterrupt:
print("
用户中断执行")sys.exit(0)
except Exception as e:
print(f"
执行出错:{str(e)}")import traceback
traceback.print_exc()
sys.exit(1)
1. 数据质量保障
定期验证 Tushare 接口可用性
对异常数据进行清洗和插值
保存历史数据作为本地缓存
2. 参数调优策略
不同市场环境使用不同参数组合
牛市:放宽条件,提高灵敏度
熊市:收紧条件,降低误报率
震荡市:增加过滤条件
3. 信号验证流程
技术信号 + 基本面筛选
多时间周期共振确认
板块联动分析
资金流向验证
4. 风险控制铁律
单笔交易风险不超过账户的 2%
总仓位不超过账户的 60%
严格执行止损,不抱侥幸心理
分散投资,不集中单一标的
我们构建的这套主拉升行情检测系统,本质上是将经验丰富的交易员的思维模式转化为可执行的代码逻辑。它的价值不在于预测未来(没有任何系统能做到这一点),而在于:
提高效率:从数千只股票中快速筛选出值得关注的标的
减少情绪干扰:用客观数据替代主观判断
持续优化:通过回测不断改进策略参数
风险量化:将模糊的“感觉”转化为精确的数字
但请永远记住:工具只是工具,最终的决策权在你手中。
市场是复杂的非线性系统,任何技术指标都只是从某个角度观察市场的窗口。真正的投资智慧,来自于对市场本质的深刻理解、对风险的敬畏之心,以及在实战中不断积累的经验。
愿这套系统能成为你投资旅程中的得力助手,而非唯一依赖。
读到这里,你可能已经对量化交易系统的构建有了完整的认知。但如果你现在就想立即上手,而不想被环境配置、依赖管理这些琐事困扰,那么我们为你准备了一条更平滑的学习曲线。
这是一个将本文核心能力封装成即插即用形态的官方技能包,由 Tushare 团队与 ClawHub 社区联合打造。
📦 立即获取:https://clawhub.ai/lidayan/tushare-data
对于初学者:你可以先通过技能包快速体验量化交易的魅力,感受数据驱动决策的威力,然后再回过头来研究本文的实现细节。这种“先用起来,再深入理解”的路径,往往比一开始就陷入代码细节更容易建立信心。
对于实战者:如果你的核心目标是策略研究而非基础设施建设,技能包可以让你将精力集中在信号逻辑优化、参数调优这些真正产生 alpha 的环节,而不是重复造轮子。
对于系统开发者:技能包可以作为你大型量化平台的标准数据获取模块。官方维护意味着接口变更会被及时适配,你无需为 Tushare 的版本升级操心。
| 零配置启动 | |
| 官方维护 | |
| 同步更新 | |
| 社区驱动 | |
| 模块化设计 |
策略快速验证:在几分钟内获取数据并测试你的交易想法
教学演示:课堂上无需为环境配置浪费时间,直接进入核心内容
数据探索:交互式地探索不同股票的历史数据和技术指标
生产级数据层:作为你量化系统的标准化数据获取模块
| 上手速度 | ||
| 学习曲线 | ||
| 定制能力 | ||
| 维护成本 | ||
| 适用人群 |
我们的建议:如果你是量化交易的新手,从技能包开始是明智的选择。当你对整个流程有了感性认识,再回来研究本文的实现细节,会有“原来如此”的顿悟感。如果你已经是经验丰富的开发者,或者需要高度定制化的功能,那么自建系统能给你最大的灵活性。
两者并不互斥:很多用户的实践是,用技能包快速搭建原型验证策略逻辑,确认方向后再用本文的代码构建生产级系统。这种渐进式的路径,既高效又稳健。
💬 我们期待你的反馈
无论你选择技能包还是自建系统,我们都欢迎你分享使用体验、提出改进建议,或者报告遇到的问题。量化交易的魅力在于持续迭代,而社区的智慧是推动进化的最强动力。
📧 联系方式:通过 ClawHub 技能页面 留言,或在 GitHub 提交 Issue。
免责声明
本文所有内容仅供学习交流,不构成任何投资建议。股市有风险,投资需谨慎。任何基于本文内容的投资决策,风险自负。
技术支持
Tushare 官方文档:https://tushare.pro/document
附录:快速参考手册
pro.daily() | pro.daily(ts_code='000001. SZ', start_date='20260101') | |
pro.stock_basic() | pro.stock_basic(exchange='', list_status='L') | |
pro.adj_factor() | pro.adj_factor(ts_code='000001. SZ') | |
pro.moneyflow() | pro.moneyflow(ts_code='000001. SZ') | |
pro.fina_indicator() | pro.fina_indicator(ts_code='000001. SZ', period='20251231') |