发布日期:2024年7月15日
一、系统架构设计
本教程将构建一个完整的量化交易系统,包含以下核心模块:
- 数据采集层:多源金融数据获取
- 数据处理层:技术指标计算与特征工程
- 策略开发层:交易信号生成
- 回测引擎层:历史数据模拟交易
- 风险控制层:最大回撤与止损策略
技术栈:Python 3.9 + Pandas + NumPy + TA-Lib + Backtrader
二、环境准备与项目初始化
1. 安装核心依赖
# 创建虚拟环境
python -m venv quant-env
source quant-env/bin/activate # Linux/Mac
quant-envScriptsactivate # Windows
# 安装依赖包
pip install pandas numpy matplotlib requests
pip install backtrader TA-Lib ccxt
pip install yfinance akshare
2. 项目目录结构
quant-system/
├── data/ # 数据存储
│ ├── raw/ # 原始数据
│ └── processed/ # 处理后的数据
├── strategies/ # 交易策略
│ ├── indicators/ # 自定义指标
│ └── backtests/ # 回测结果
├── utils/ # 工具函数
│ ├── data_fetcher.py # 数据获取
│ └── risk_manager.py # 风险管理
├── config.py # 配置文件
└── main.py # 主程序入口
三、金融数据采集
1. 多源数据获取
# utils/data_fetcher.py
import yfinance as yf
import akshare as ak
import ccxt
class DataFetcher:
@staticmethod
def get_stock_data(symbol, start, end):
"""从Yahoo Finance获取股票数据"""
data = yf.download(
symbol,
start=start,
end=end,
progress=False
)
return data
@staticmethod
def get_fund_data(symbol):
"""从AKShare获取基金数据"""
return ak.fund_em_open_fund_info(symbol)
@staticmethod
def get_crypto_data(symbol, exchange='binance'):
"""从加密货币交易所获取数据"""
api = getattr(ccxt, exchange)()
ohlcv = api.fetch_ohlcv(
symbol,
'1d',
limit=1000
)
return pd.DataFrame(
ohlcv,
columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']
)
2. 数据清洗与存储
def clean_data(df):
"""数据清洗处理"""
# 处理缺失值
df = df.fillna(method='ffill')
# 去除异常值
df = df[(df['high'] - df['low']) > 0]
# 标准化列名
df.columns = df.columns.str.lower()
# 添加收益率
df['return'] = df['close'].pct_change()
return df
def save_to_database(df, name, engine):
"""存储到SQLite数据库"""
df.to_sql(
name,
engine,
if_exists='replace',
index=True
)
四、技术指标计算
1. 常用指标实现
# strategies/indicators/technical.py
import talib
import numpy as np
class TechnicalIndicators:
@staticmethod
def sma(df, window=20):
"""简单移动平均"""
return talib.SMA(df['close'], timeperiod=window)
@staticmethod
def rsi(df, window=14):
"""相对强弱指数"""
return talib.RSI(df['close'], timeperiod=window)
@staticmethod
def macd(df, fast=12, slow=26, signal=9):
"""MACD指标"""
macd, signal, _ = talib.MACD(
df['close'],
fastperiod=fast,
slowperiod=slow,
signalperiod=signal
)
return macd, signal
@staticmethod
def bollinger_bands(df, window=20):
"""布林带"""
upper, middle, lower = talib.BBANDS(
df['close'],
timeperiod=window,
nbdevup=2,
nbdevdn=2,
matype=0
)
return upper, middle, lower
2. 自定义指标开发
class CustomIndicators:
@staticmethod
def volume_spike(df, window=20, threshold=2):
"""成交量异动"""
mean_vol = df['volume'].rolling(window).mean()
std_vol = df['volume'].rolling(window).std()
return (df['volume'] - mean_vol) > (threshold * std_vol)
@staticmethod
def price_momentum(df, window=10):
"""价格动量"""
return df['close'] / df['close'].shift(window) - 1
@staticmethod
def volatility(df, window=20):
"""波动率计算"""
returns = df['close'].pct_change()
return returns.rolling(window).std() * np.sqrt(252)
五、交易策略开发
1. 双均线策略实现
# strategies/dual_moving_average.py
import backtrader as bt
class DualMovingAverageStrategy(bt.Strategy):
params = (
('fast', 10),
('slow', 30),
('printlog', False)
)
def __init__(self):
# 初始化指标
self.fast_ma = bt.indicators.SMA(
self.data.close,
period=self.p.fast
)
self.slow_ma = bt.indicators.SMA(
self.data.close,
period=self.p.slow
)
self.crossover = bt.indicators.CrossOver(
self.fast_ma,
self.slow_ma
)
def next(self):
if not self.position:
if self.crossover > 0: # 快线上穿慢线
self.buy()
elif self.crossover < 0: # 快线下穿慢线
self.close()
def log(self, txt, dt=None, doprint=False):
if self.params.printlog or doprint:
dt = dt or self.datas[0].datetime.date(0)
print(f'{dt.isoformat()}, {txt}')
def notify_order(self, order):
if order.status in [order.Submitted, order.Accepted]:
return
if order.status == order.Completed:
if order.isbuy():
self.log(f'买入执行, 价格: {order.executed.price:.2f}')
elif order.issell():
self.log(f'卖出执行, 价格: {order.executed.price:.2f}')
elif order.status in [order.Canceled, order.Margin, order.Rejected]:
self.log('订单取消/保证金不足/被拒绝')
2. 均值回归策略
# strategies/mean_reversion.py
class MeanReversionStrategy(bt.Strategy):
params = (
('bb_window', 20),
('rsi_window', 14),
('rsi_overbought', 70),
('rsi_oversold', 30)
)
def __init__(self):
# 布林带指标
self.upper, self.middle, self.lower = bt.indicators.BollingerBands(
self.data.close,
period=self.p.bb_window
)
# RSI指标
self.rsi = bt.indicators.RSI(
self.data.close,
period=self.p.rsi_window
)
def next(self):
if not self.position:
# 价格触及下轨且RSI超卖
if (self.data.close[0] <= self.lower[0] and
self.rsi[0] = self.upper[0] and
self.rsi[0] >= self.p.rsi_overbought):
self.close()
六、回测引擎实现
1. 回测系统配置
# backtest_engine.py
import backtrader as bt
class BacktestEngine:
def __init__(self):
self.cerebro = bt.Cerebro()
self.cerebro.broker.setcash(100000.0)
self.cerebro.broker.setcommission(commission=0.001) # 0.1%佣金
def add_data(self, df, name='stock'):
"""添加数据到回测引擎"""
data = bt.feeds.PandasData(dataname=df)
self.cerebro.adddata(data, name=name)
def add_strategy(self, strategy, **params):
"""添加交易策略"""
self.cerebro.addstrategy(strategy, **params)
def run_backtest(self):
"""执行回测"""
print('初始资金: %.2f' % self.cerebro.broker.getvalue())
self.cerebro.run()
print('最终资金: %.2f' % self.cerebro.broker.getvalue())
def plot_result(self):
"""绘制回测结果"""
self.cerebro.plot(style='candlestick')
2. 回测结果分析
def analyze_result(self):
"""分析回测绩效"""
# 获取回测结果
strat = self.cerebro.runstrats[0][0]
# 计算关键指标
total_return = self.cerebro.broker.getvalue() / 100000 - 1
sharpe_ratio = strat.analyzers.sharpe.get_analysis()['sharperatio']
max_drawdown = strat.analyzers.drawdown.get_analysis()['max']['drawdown']
# 打印结果
print(f'总收益率: {total_return:.2%}')
print(f'夏普比率: {sharpe_ratio:.2f}')
print(f'最大回撤: {max_drawdown:.2%}')
# 返回交易记录
return strat.analyzers.trades.get_analysis()
七、风险管理模块
1. 止损策略实现
# utils/risk_manager.py
class RiskManager:
@staticmethod
def fixed_stop_loss(strategy, loss_pct=0.05):
"""固定比例止损"""
if strategy.position:
price = strategy.position.price
current = strategy.data.close[0]
loss = (price - current) / price
if loss >= loss_pct:
strategy.close()
strategy.log(f'触发止损, 亏损比例: {loss:.2%}')
@staticmethod
def trailing_stop(strategy, trail_pct=0.03):
"""移动止损"""
if not hasattr(strategy, 'highest_price'):
strategy.highest_price = strategy.data.close[0]
if strategy.position:
current = strategy.data.close[0]
strategy.highest_price = max(current, strategy.highest_price)
if current <= strategy.highest_price * (1 - trail_pct):
strategy.close()
strategy.log(f'触发移动止损, 最高价: {strategy.highest_price:.2f}')
2. 仓位控制
class PositionSizer:
@staticmethod
def fixed_size(strategy, size=100):
"""固定数量"""
return size
@staticmethod
def percent_risk(strategy, risk_pct=0.01, stop_pct=0.05):
"""风险比例控制"""
account_value = strategy.broker.getvalue()
risk_amount = account_value * risk_pct
price = strategy.data.close[0]
stop_loss = price * stop_pct
return int(risk_amount / stop_loss)
@staticmethod
def volatility_adjusted(strategy, target_vol=0.2, window=20):
"""波动率调整"""
returns = strategy.data.close.get(size=window, ago=0) /
strategy.data.close.get(size=window, ago=1) - 1
vol = returns.std() * np.sqrt(252)
if vol > 0:
return int(target_vol / vol * 100)
return 100
八、总结与扩展
通过本教程,您已经掌握了:
- 金融数据采集与清洗
- 技术指标计算与分析
- 交易策略开发实现
- 回测系统构建与优化
- 风险管理体系设计
扩展学习方向:
- 机器学习策略开发
- 高频交易系统设计
- 实盘交易接口对接
- 多因子模型构建