统计套利详解
最经典的量化策略:从数学原理到完整代码实现
学习目标
- 理解统计套利和配对交易的核心原理
- 掌握协整检验的数学原理和 Python 实现
- 学会构建完整的配对交易系统
- 了解统计套利的风险和 ML 增强方法
一、配对交易原理
1.1 统计套利的核心思想
统计套利(Statistical Arbitrage)基于一个基本观察:某些资产之间存在长期稳定的关系,当这种关系被打破时,就存在交易机会。
┌─────────────────────────────────────────────────────────────┐
│ 统计套利的核心逻辑 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 正常状态:股票A和股票B价格关系稳定 │
│ │
│ A ─────────────────────── B │
│ ↖ ↗ │
│ ↖ ↗ │
│ 价差稳定在一定范围 │
│ │
│ 异常状态:价差异常扩大 │
│ │
│ A ───────────────── B ←───── 套利机会 │
│ ↖ ↗ │
│ ↖ 价差异常扩大 ↗ │
│ │
│ 交易动作: │
│ • 做空"相对昂贵"的资产 │
│ • 做多"相对便宜"的资产 │
│ • 等待价差回归平仓获利 │
│ │
└─────────────────────────────────────────────────────────────┘
1.2 什么是价差的平稳性
**平稳性(Stationarity)**是统计套利的数学基础。
非平稳序列(如股票价格):
- 均值随时间变化
- 方差随时间变化
- 有趋势,不回归任何固定值
平稳序列(如价差):
- 均值恒定
- 方差恒定
- 围绕均值波动,有回归倾向
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# 对比非平稳和平稳序列
np.random.seed(42)
n = 500
# 非平稳:随机游走(股票价格)
non_stationary = pd.Series(
np.cumsum(np.random.normal(0, 1, n)),
index=pd.date_range('2023-01-01', periods=n, freq='D'),
name='Stock Price'
)
# 平稳:Ornstein-Uhlenbeck 过程(价差)
stationary = []
x = 0
theta = 0.1 # 回归速度
mu = 0 # 均值
sigma = 1 # 波动率
for _ in range(n):
dx = theta * (mu - x) + sigma * np.random.normal(0, 1)
x += dx
stationary.append(x)
stationary = pd.Series(
stationary,
index=pd.date_range('2023-01-01', periods=n, freq='D'),
name='Spread'
)
# 可视化对比
fig, axes = plt.subplots(2, 1, figsize=(14, 8))
axes[0].plot(non_stationary.index, non_stationary.values)
axes[0].set_title('非平稳序列:股票价格(随机游走)')
axes[0].set_ylabel('Price')
axes[0].grid(True, alpha=0.3)
axes[0].text(non_stationary.index[50], non_stationary.max() * 0.9,
'• 无固定均值\n• 有趋势\n• 不回归', fontsize=10,
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
axes[1].plot(stationary.index, stationary.values)
axes[1].axhline(y=0, color='red', linestyle='--', alpha=0.7, label='Mean')
axes[1].fill_between(stationary.index, -2, 2, alpha=0.1, color='green')
axes[1].set_title('平稳序列:价差(均值回归)')
axes[1].set_ylabel('Spread')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
axes[1].text(stationary.index[50], stationary.max() * 0.8,
'• 固定均值 (0)\n• 围绕均值波动\n• 有回归倾向', fontsize=10,
bbox=dict(boxstyle='round', facecolor='lightblue', alpha=0.5))
plt.tight_layout()
plt.show()1.3 Ornstein-Uhlenbeck 过程
价差的行为可以用 OU 过程描述:
其中:
- = 价差在时间 t 的值
- = 价差的长期均值
- = 回归速度(越大回归越快)
- = 波动率
- = 维纳过程增量
OU 过程的离散化形式:
其中
def simulate_ou_process(n_periods, theta=0.1, mu=0, sigma=1, x0=0, dt=1):
"""
模拟 Ornstein-Uhlenbeck 过程
参数:
n_periods: 模拟期数
theta: 回归速度 (0-1,越大回归越快)
mu: 长期均值
sigma: 扩散系数(波动率)
x0: 初始值
dt: 时间步长
返回:
ou_series: OU 过程序列
"""
x = x0
ou_values = []
for _ in range(n_periods):
dx = theta * (mu - x) * dt + sigma * np.sqrt(dt) * np.random.normal(0, 1)
x += dx
ou_values.append(x)
return pd.Series(ou_values)
# 比较不同回归速度
fig, axes = plt.subplots(2, 2, figsize=(14, 8))
thetas = [0.01, 0.05, 0.2, 0.5]
titles = ['弱回归 (θ=0.01)', '中等回归 (θ=0.05)', '强回归 (θ=0.2)', '很强回归 (θ=0.5)']
for i, (ax, theta, title) in enumerate(zip(axes.flat, thetas, titles)):
np.random.seed(42 + i)
ou = simulate_ou_process(500, theta=theta, mu=0, sigma=2)
ax.plot(ou.index, ou.values, alpha=0.7)
ax.axhline(y=0, color='red', linestyle='--', alpha=0.5)
# 计算半衰期
half_life = np.log(2) / theta if theta > 0 else float('inf')
ax.set_title(f'{title}\n半衰期 ≈ {half_life:.1f} 期')
ax.grid(True, alpha=0.3)
plt.suptitle('不同回归速度的 OU 过程', fontsize=14)
plt.tight_layout()
plt.show()半衰期(Half-Life):价差偏离均值后,回归到一半偏离度所需时间:
def calculate_half_life(spread):
"""
计算价差的半衰期
通过回归估计: spread_{t+1} - spread_t = theta * (mu - spread_t) + epsilon
"""
# 准备数据
y = spread.diff().dropna()
x = spread.shift(1).dropna()
# 对齐
y = y[y.index.isin(x.index)]
x = x[x.index.isin(y.index)]
# 线性回归
from scipy import stats
slope, intercept, r_value, p_value, std_err = stats.linregress(x, y)
# theta = -slope
theta = -slope
if theta > 0:
half_life = np.log(2) / theta
return half_life, theta
else:
return float('inf'), theta二、协整检验 (Cointegration Test)
协整是统计套利的数学基础。协整 ≠ 相关性,这是初学者最容易混淆的概念。
2.1 协整 vs 相关性
| 维度 | 协整 (Cointegration) | 相关性 (Correlation) |
|---|---|---|
| 定义 | 长期均衡关系 | 线性关系强度 |
| 关注点 | 价差的平稳性 | 变动方向一致性 |
| 序列属性 | 要求序列是非平稳的 | 无要求 |
| 应用 | 配对交易、统计套利 | 投资组合分散 |
| 典型例子 | 股票价格与期货价格 | 同行业股票走势 |
关键区别:
相关性高 ≠ 协整关系
- 两只股票可能高度相关(同涨同跌)
- 但价差可能持续扩大(没有协整关系)
- 这种情况下做配对交易会亏损
协整关系强 ≠ 相关性高
- 理论上协整的资产可能有短期低相关
- 但长期价差会回归
2.2 数学定义
两个时间序列 和 协整,如果:
- 和 都是 序列(一阶差分平稳)
- 存在线性组合 是 序列(平稳)
直觉理解:虽然两个序列各自都在”漫游”,但它们之间保持某种关系,不会无限偏离。
2.3 Engle-Granger 两步法
最常用的协整检验方法。
步骤一:回归求长期关系
步骤二:检验残差平稳性
如果 平稳,则 和 协整。
2.3.1 完整 Python 实现
import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import adfuller
from scipy import stats
import matplotlib.pyplot as plt
def engle_granger_cointegration_test(series1, series2, significance_level=0.05):
"""
Engle-Granger 两步法协整检验
参数:
series1, series2: 两个价格序列
significance_level: 显著性水平
返回:
results: 包含检验结果的字典
"""
# 确保对齐
df = pd.DataFrame({'y': series1, 'x': series2}).dropna()
y = df['y'].values
x = df['x'].values
results = {
'series1_name': series1.name if hasattr(series1, 'name') else 'Series1',
'series2_name': series2.name if hasattr(series2, 'name') else 'Series2',
'is_cointegrated': False,
'hedge_ratio': None,
'p_value': None,
'critical_values': None,
'spread': None,
'test_statistic': None
}
# ===== 步骤一:长期关系回归 =====
# 添加常数项
x_with_const = np.column_stack([np.ones(len(x)), x])
# OLS 回归
beta_hat = np.linalg.lstsq(x_with_const, y, rcond=None)[0]
alpha, beta = beta_hat[0], beta_hat[1]
# 计算残差(价差)
spread = y - alpha - beta * x
# ===== 步骤二:ADF 检验残差平稳性 =====
# 注意:ADF 分布不标准,需要用 MacKinnon 临界值
adf_result = adfuller(spread, maxlag=1, regression='c')
results['test_statistic'] = adf_result[0]
results['p_value'] = adf_result[1]
results['critical_values'] = adf_result[4]
results['hedge_ratio'] = beta
results['spread'] = pd.Series(spread, index=df.index)
results['alpha'] = alpha
# 判断是否协整
# 注:实际应使用 Engle-Granger 临界值,这里简化使用 ADF 临界值
if results['p_value'] < significance_level:
results['is_cointegrated'] = True
return results
def plot_cointegration_analysis(series1, series2, results):
"""可视化协整分析结果"""
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 1. 原始价格
axes[0, 0].plot(series1.index, series1.values, label=results['series1_name'], alpha=0.7)
axes[0, 0].plot(series2.index, series2.values, label=results['series2_name'], alpha=0.7)
axes[0, 0].set_title('原始价格序列')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 2. 协整关系
normalized_series2 = results['alpha'] + results['hedge_ratio'] * series2
axes[0, 1].plot(series1.index, series1.values, label=results['series1_name'], alpha=0.7)
axes[0, 1].plot(series2.index, normalized_series2.values,
label=f"{results['series2_name']} (adjusted)", alpha=0.7)
axes[0, 1].set_title(f'协整关系 (对冲比率 = {results["hedge_ratio"]:.3f})')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# 3. 价差
spread = results['spread']
spread_mean = spread.mean()
spread_std = spread.std()
axes[1, 0].plot(spread.index, spread.values, label='Spread', color='steelblue')
axes[1, 0].axhline(y=spread_mean, color='red', linestyle='--', label='Mean', alpha=0.7)
axes[1, 0].axhline(y=spread_mean + 2*spread_std, color='green', linestyle=':', label='±2σ', alpha=0.5)
axes[1, 0].axhline(y=spread_mean - 2*spread_std, color='green', linestyle=':', alpha=0.5)
axes[1, 0].fill_between(spread.index, spread_mean - 2*spread_std, spread_mean + 2*spread_std, alpha=0.1)
axes[1, 0].set_title('价差(残差)')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
# 4. ADF 检验结果
axes[1, 1].axis('off')
test_text = f"""
ADF 检验结果:
检验统计量: {results['test_statistic']:.4f}
p 值: {results['p_value']:.4f}
临界值:
• 1%: {results['critical_values']['1%']:.4f}
• 5%: {results['critical_values']['5%']:.4f}
• 10%: {results['critical_values']['10%']:.4f}
结论: {'✅ 协整关系存在' if results['is_cointegrated'] else '❌ 无协整关系'}
对冲比率 (β): {results['hedge_ratio']:.4f}
"""
axes[1, 1].text(0.1, 0.5, test_text, fontsize=12, verticalalignment='center',
bbox=dict(boxstyle='round', facecolor='wheat' if results['is_cointegrated'] else 'lightcoral', alpha=0.5))
plt.suptitle('协整分析报告', fontsize=14)
plt.tight_layout()
plt.show()
return fig
# ============ 模拟协整数据演示 ============
np.random.seed(42)
n_periods = 500
# 生成协整数据
# y_t = α + β * x_t + ε_t,其中 ε_t 服从平稳过程
# x: 随机游走
x = np.cumsum(np.random.normal(0, 1, n_periods))
# y: 与 x 协整
beta_true = 1.5
alpha_true = 10
noise = np.random.normal(0, 2, n_periods)
y = alpha_true + beta_true * x + noise
# 转换为 Series
dates = pd.date_range('2023-01-01', periods=n_periods, freq='D')
series_x = pd.Series(x, index=dates, name='Stock_X')
series_y = pd.Series(y, index=dates, name='Stock_Y')
# 运行协整检验
results = engle_granger_cointegration_test(series_y, series_x)
print(f"协整检验结果: {'通过' if results['is_cointegrated'] else '未通过'}")
print(f"估计对冲比率: {results['hedge_ratio']:.4f} (真实值: {beta_true})")
print(f"ADF p-value: {results['p_value']:.6f}")
# 可视化
plot_cointegration_analysis(series_y, series_x, results)
# ============ 对比:非协整数据 ============
# 生成两个独立的随机游走
np.random.seed(123)
x_non = np.cumsum(np.random.normal(0, 1, n_periods))
y_non = np.cumsum(np.random.normal(0, 1, n_periods)) + 100
series_x_non = pd.Series(x_non, index=dates, name='Stock_X_Non')
series_y_non = pd.Series(y_non, index=dates, name='Stock_Y_Non')
# 检验
results_non = engle_granger_cointegration_test(series_y_non, series_x_non)
print(f"\n非协整检验结果: {'通过' if results_non['is_cointegrated'] else '未通过'}")
print(f"ADF p-value: {results_non['p_value']:.6f}")2.4 Johansen 检验(多变量协整)
当涉及多个资产时,使用 Johansen 检验更合适。
from statsmodels.tsa.vector_ar.vecm import coint_johansen
def johansen_cointegration_test(prices_df, det_order=0, k_ar_diff=1):
"""
Johansen 协整检验(多变量)
参数:
prices_df: 价格 DataFrame (每列一个资产)
det_order: 确定性趋势阶数 (0=无, 1=常数, 2=常数+趋势)
k_ar_diff: VAR 滞后阶数
返回:
results: 检验结果
"""
# 准备数据
data = prices_df.values
# Johansen 检验
result = coint_johansen(data, det_order, k_ar_diff)
results = {
'eig': result.eig, # 特征值
'r0t': result.r0t, # 迹统计量
'r1t': result.r1t, # 最大特征值统计量
'cvt': result.cvt, # 迹统计量临界值 (90%, 95%, 99%)
'cvm': result.cvm, # 最大特征值临界值
'evec': result.evec # 特征向量(协整向量)
}
# 解释结果
r_trace = []
for i, (stat, crit) in enumerate(zip(results['r0t'], results['cvt'][:, 1])): # 使用 95% 临界值
r_trace.append({
'rank': i,
'trace_statistic': stat,
'critical_value_95': crit,
'reject': stat > crit
})
results['trace_test'] = r_trace
results['cointegration_rank'] = sum([r['reject'] for r in r_trace]) - 1
results['cointegration_rank'] = max(0, results['cointegration_rank'])
return results
# 演示 Johansen 检验
np.random.seed(42)
n_periods = 500
# 生成三个协整的序列
# 共同因子
common_factor = np.cumsum(np.random.normal(0, 1, n_periods))
# 三个序列受共同因子驱动
z1 = 100 + 1.0 * common_factor + np.random.normal(0, 2, n_periods)
z2 = 80 + 1.2 * common_factor + np.random.normal(0, 1.5, n_periods)
z3 = 120 + 0.8 * common_factor + np.random.normal(0, 2.5, n_periods)
dates = pd.date_range('2023-01-01', periods=n_periods, freq='D')
prices_3 = pd.DataFrame({
'Stock_A': z1,
'Stock_B': z2,
'Stock_C': z3
}, index=dates)
# Johansen 检验
results_johansen = johansen_cointegration_test(prices_3)
print("Johansen 协整检验结果:")
print(f"估计的协整秩: {results_johansen['cointegration_rank']}")
print("\n迹统计量检验:")
for r in results_johansen['trace_test']:
print(f" H0: 协整秩 = {r['rank']}")
print(f" 迹统计量: {r['trace_statistic']:.2f}")
print(f" 95% 临界值: {r['critical_value_95']:.2f}")
print(f" 结论: {'拒绝 H0' if r['reject'] else '无法拒绝 H0'}")
# 协整向量(标准化第一个为1)
print("\n协整向量:")
for i, vec in enumerate(results_johansen['evec'].T):
vec_norm = vec / vec[0]
print(f" 向量 {i+1}: {vec_norm}")三、配对选择方法
找到好的配对是成功的关键。
3.1 距离法(Distance Method)
逻辑:历史上价格走势接近的股票,可能是好的配对。
def distance_method_pairing(prices_df, lookback=252, top_n=10):
"""
距离法选择配对
度量:价格序列的欧氏距离(或 SSD:平方和距离)
参数:
prices_df: 价格 DataFrame
lookback: 回看期
top_n: 返回前 N 个配对
返回:
pairs: 配对列表 [(stock1, stock2, distance), ...]
"""
# 使用最近 lookback 期的数据
recent_prices = prices_df.tail(lookback)
# 标准化价格(初始价格都为1)
normalized = recent_prices / recent_prices.iloc[0]
# 计算所有配对的距离
n_stocks = len(prices_df.columns)
distances = []
for i in range(n_stocks):
for j in range(i + 1, n_stocks):
stock1 = prices_df.columns[i]
stock2 = prices_df.columns[j]
# SSD 距离
diff = normalized[stock1] - normalized[stock2]
ssd = np.sum(diff ** 2)
distances.append((stock1, stock2, ssd))
# 排序并返回 top_n
distances.sort(key=lambda x: x[2])
return distances[:top_n]
# 演示距离法
np.random.seed(42)
n_stocks = 20
n_periods = 500
# 生成一些相似的股票对
prices_dict = {}
for i in range(n_stocks):
if i % 3 == 0:
# 创建配对
base = np.cumsum(np.random.normal(0.05, 1, n_periods))
prices_dict[f'Stock_{i}'] = 100 + base + np.random.normal(0, 1, n_periods)
prices_dict[f'Stock_{i+1}'] = 100 + base * 0.95 + np.random.normal(0, 1, n_periods)
else:
prices_dict[f'Stock_{i}'] = 100 + np.cumsum(np.random.normal(0, 1, n_periods))
prices_demo = pd.DataFrame(prices_dict)
# 使用距离法
pairs = distance_method_pairing(prices_demo, lookback=252, top_n=5)
print("距离法选择的配对:")
for i, (s1, s2, dist) in enumerate(pairs, 1):
print(f"{i}. {s1} - {s2}: 距离 = {dist:.2f}")3.2 协整法
逻辑:直接检验协整关系,选择协整性强的配对。
def cointegration_method_pairing(prices_df, significance_level=0.05, top_n=10):
"""
协整法选择配对
对所有可能配对进行协整检验,选择 p 值最小的
参数:
prices_df: 价格 DataFrame
significance_level: 显著性水平
top_n: 返回前 N 个配对
返回:
pairs: 配对列表 [(stock1, stock2, hedge_ratio, p_value), ...]
"""
n_stocks = len(prices_df.columns)
results = []
for i in range(n_stocks):
for j in range(i + 1, n_stocks):
stock1 = prices_df.columns[i]
stock2 = prices_df.columns[j]
# 协整检验
test_result = engle_granger_cointegration_test(
prices_df[stock1],
prices_df[stock2],
significance_level=significance_level
)
if test_result['is_cointegrated']:
results.append((
stock1,
stock2,
test_result['hedge_ratio'],
test_result['p_value']
))
# 按 p 值排序
results.sort(key=lambda x: x[3])
return results[:top_n]
# 演示
pairs_coint = cointegration_method_pairing(prices_demo, significance_level=0.05, top_n=5)
print("\n协整法选择的配对:")
for i, (s1, s2, hedge, pval) in enumerate(pairs_coint, 1):
print(f"{i}. {s1} - {s2}: 对冲比 = {hedge:.3f}, p 值 = {pval:.4f}")3.3 机器学习法(聚类)
逻辑:使用聚类算法将相似的股票分组,然后在组内选择配对。
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
def clustering_based_pairing(prices_df, n_clusters=5, top_per_cluster=2):
"""
基于聚类的配对选择
步骤:
1. 提取特征(收益率、波动率等)
2. 聚类
3. 在同一聚类内寻找配对
参数:
prices_df: 价格 DataFrame
n_clusters: 聚类数量
top_per_cluster: 每个聚类返回的配对数
返回:
pairs: 配对列表
"""
# 特征工程
returns = prices_df.pct_change()
features = pd.DataFrame(index=prices_df.columns)
# 各类特征
features['mean_return'] = returns.mean()
features['volatility'] = returns.std()
features['skewness'] = returns.skew()
features['kurtosis'] = returns.kurt()
# 添加一些技术指标特征
for col in prices_df.columns:
price = prices_df[col]
features.loc[col, 'trend'] = (price.iloc[-1] / price.iloc[0] - 1)
# 标准化
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)
# 聚类
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
clusters = kmeans.fit_predict(features_scaled)
features['cluster'] = clusters
# 在每个聚类内找配对
pairs = []
for cluster_id in range(n_clusters):
cluster_stocks = features[features['cluster'] == cluster_id].index.tolist()
if len(cluster_stocks) >= 2:
# 在聚类内计算距离
for i in range(len(cluster_stocks)):
for j in range(i + 1, len(cluster_stocks)):
s1, s2 = cluster_stocks[i], cluster_stocks[j]
# 计算特征距离
dist = np.linalg.norm(
features_scaled[features.index.get_loc(s1)] -
features_scaled[features.index.get_loc(s2)]
)
pairs.append((s1, s2, cluster_id, dist))
# 按距离排序
pairs.sort(key=lambda x: x[3])
return pairs[:top_per_cluster * n_clusters]
# 可视化聚类
def visualize_clustering(prices_df, n_clusters=5):
"""可视化股票聚类结果"""
# 特征
returns = prices_df.pct_change()
features = pd.DataFrame(index=prices_df.columns)
features['mean_return'] = returns.mean()
features['volatility'] = returns.std()
# 聚类
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
clusters = kmeans.fit_predict(features_scaled)
# PCA 降维用于可视化
pca = PCA(n_components=2)
features_2d = pca.fit_transform(features_scaled)
# 绘图
plt.figure(figsize=(10, 8))
scatter = plt.scatter(features_2d[:, 0], features_2d[:, 1],
c=clusters, cmap='tab10', s=100, alpha=0.7)
# 添加股票标签
for i, stock in enumerate(prices_df.columns):
plt.annotate(stock, (features_2d[i, 0], features_2d[i, 1]),
fontsize=8, alpha=0.8)
plt.colorbar(scatter, label='Cluster')
plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.1%} variance)')
plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.1%} variance)')
plt.title('股票聚类(基于收益-波动特征)')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 演示聚类法
pairs_cluster = clustering_based_pairing(prices_demo, n_clusters=5, top_per_cluster=2)
print("\n聚类法选择的配对:")
for i, (s1, s2, cluster_id, dist) in enumerate(pairs_cluster[:10], 1):
print(f"{i}. {s1} - {s2}: 聚类 {cluster_id}, 特征距离 = {dist:.3f}")
visualize_clustering(prices_demo, n_clusters=5)四、交易信号生成
找到配对后,需要定义交易规则。
4.1 Z-score 入场/出场
def zscore_trading_signals(spread, entry_threshold=2.0, exit_threshold=0.5,
stop_loss_threshold=4.0, lookback=20):
"""
基于 Z-score 的交易信号生成
参数:
spread: 价差序列
entry_threshold: 入场 Z-score 阈值(如 ±2)
exit_threshold: 出场 Z-score 阈值(如 ±0.5)
stop_loss_threshold: 止损阈值(如 ±4)
lookback: 均值和标准差的滚动窗口
返回:
signals: 信号 DataFrame (long_spread, short_spread, position)
"""
# 计算滚动统计量
spread_mean = spread.rolling(window=lookback).mean()
spread_std = spread.rolling(window=lookback).std()
# Z-score
z_score = (spread - spread_mean) / spread_std
# 初始化信号
signals = pd.DataFrame(index=spread.index)
signals['z_score'] = z_score
signals['spread'] = spread
signals['position'] = 0 # 1=做多价差, -1=做空价差, 0=空仓
current_position = 0
for i in range(lookback, len(spread)):
z = z_score.iloc[i]
if current_position == 0: # 空仓状态
# 价差异常低 → 做多价差(做多被低估的,做空被高估的)
if z < -entry_threshold:
current_position = 1
# 价差异常高 → 做空价差
elif z > entry_threshold:
current_position = -1
elif current_position == 1: # 持有多头价差
# 回归到均值 → 平仓
if z > -exit_threshold:
current_position = 0
# 止损
elif z < -stop_loss_threshold:
current_position = 0
elif current_position == -1: # 持有空头价差
# 回归到均值 → 平仓
if z < exit_threshold:
current_position = 0
# 止损
elif z > stop_loss_threshold:
current_position = 0
signals.iloc[i, signals.columns.get_loc('position')] = current_position
return signals
# 演示信号生成
np.random.seed(42)
# 使用之前协整的价差
spread_series = results['spread']
# 生成信号
signals = zscore_trading_signals(spread_series, entry_threshold=2.0,
exit_threshold=0.5, stop_loss_threshold=4.0)
# 可视化
fig, axes = plt.subplots(2, 1, figsize=(14, 8))
# 价差和 Z-score
ax1 = axes[0]
ax1.plot(signals.index, signals['spread'].values, label='Spread', color='steelblue', alpha=0.7)
ax1.set_ylabel('Spread', color='steelblue')
ax1.tick_params(axis='y', labelcolor='steelblue')
ax1_twin = ax1.twinx()
ax1_twin.plot(signals.index, signals['z_score'].values, label='Z-Score', color='orange', alpha=0.7)
ax1_twin.axhline(y=2, color='red', linestyle='--', alpha=0.5, label='Entry (±2)')
ax1_twin.axhline(y=-2, color='green', linestyle='--', alpha=0.5)
ax1_twin.axhline(y=0.5, color='gray', linestyle=':', alpha=0.5, label='Exit (±0.5)')
ax1_twin.axhline(y=-0.5, color='gray', linestyle=':', alpha=0.5)
ax1_twin.set_ylabel('Z-Score', color='orange')
ax1_twin.tick_params(axis='y', labelcolor='orange')
# 标记交易
long_entries = signals[(signals['position'].diff() == 1) | (signals['position'] == 1)]
short_entries = signals[(signals['position'].diff() == -1) | (signals['position'] == -1)]
exits = signals[signals['position'] == 0]
axes[0].set_title('价差和 Z-Score')
# 持仓
axes[1].plot(signals.index, signals['position'].values, drawstyle='steps-post',
label='Position', linewidth=2)
axes[1].fill_between(signals.index, 0, signals['position'].values,
where=signals['position'].values > 0, alpha=0.3, color='green', label='Long Spread')
axes[1].fill_between(signals.index, 0, signals['position'].values,
where=signals['position'].values < 0, alpha=0.3, color='red', label='Short Spread')
axes[1].axhline(y=0, color='black', linestyle='-', alpha=0.3)
axes[1].set_ylabel('Position')
axes[1].set_xlabel('Date')
axes[1].set_title('交易信号')
axes[1].legend(loc='upper left')
axes[1].grid(True, alpha=0.3)
axes[1].set_ylim(-1.5, 1.5)
plt.tight_layout()
plt.show()4.2 Bollinger Band 变体
def bollinger_band_signals(spread, lookback=20, num_std=2.0):
"""
Bollinger Band 信号生成
参数:
spread: 价差序列
lookback: 均线和标准差窗口
num_std: 标准差倍数
返回:
signals: 信号 DataFrame
"""
# 计算 BB
middle = spread.rolling(window=lookback).mean()
std = spread.rolling(window=lookback).std()
upper = middle + num_std * std
lower = middle - num_std * std
# 生成信号
signals = pd.DataFrame(index=spread.index)
signals['spread'] = spread
signals['upper'] = upper
signals['lower'] = lower
signals['middle'] = middle
# 信号:触及上轨做空价差,触及下轨做多价差
signals['position'] = 0
signals.loc[spread > upper, 'position'] = -1
signals.loc[spread < lower, 'position'] = 1
# 平仓:回归到中轨附近
signals.loc[spread.between(middle * 0.98, middle * 1.02), 'position'] = 0
return signals五、完整配对交易系统
将所有组件整合在一起。
import numpy as np
import pandas as pd
from scipy import stats
from statsmodels.tsa.stattools import adfuller
import matplotlib.pyplot as plt
class PairsTradingSystem:
"""
完整的配对交易系统
功能:
1. 数据模拟/加载
2. 协整检验
3. 信号生成
4. 回测
5. 绩效评估
"""
def __init__(self, stock1_name='Stock_A', stock2_name='Stock_B'):
self.stock1_name = stock1_name
self.stock2_name = stock2_name
self.prices1 = None
self.prices2 = None
self.hedge_ratio = None
self.spread = None
self.signals = None
self.returns = None
self.performance = {}
def generate_cointegrated_data(self, n_periods=500, beta=1.5, alpha=10,
noise_std=2, seed=42):
"""
生成模拟的协整数据
参数:
n_periods: 数据期数
beta: 真实对冲比率
alpha: 截距
noise_std: 价差噪声标准差
seed: 随机种子
"""
np.random.seed(seed)
# 生成随机游走 x
x = np.cumsum(np.random.normal(0, 1, n_periods))
x = pd.Series(x, name=self.stock2_name)
# 生成与 x 协整的 y
spread_noise = np.random.normal(0, noise_std, n_periods)
y = alpha + beta * x + spread_noise
y = pd.Series(y, name=self.stock1_name)
dates = pd.date_range('2023-01-01', periods=n_periods, freq='D')
self.prices1 = pd.Series(y.values, index=dates, name=self.stock1_name)
self.prices2 = pd.Series(x.values, index=dates, name=self.stock2_name)
return self.prices1, self.prices2
def load_data(self, prices1, prices2):
"""加载实际数据"""
self.prices1 = prices1
self.prices2 = prices2
def test_cointegration(self, significance_level=0.05):
"""
Engle-Granger 协整检验
返回:
is_cointegrated: 是否协整
hedge_ratio: 对冲比率
p_value: p 值
"""
if self.prices1 is None or self.prices2 is None:
raise ValueError("请先加载或生成数据")
# 对齐数据
df = pd.DataFrame({self.stock1_name: self.prices1,
self.stock2_name: self.prices2}).dropna()
y = df[self.stock1_name].values
x = df[self.stock2_name].values
# OLS 回归
x_with_const = np.column_stack([np.ones(len(x)), x])
beta_hat = np.linalg.lstsq(x_with_const, y, rcond=None)[0]
alpha, beta = beta_hat[0], beta_hat[1]
# 计算价差
spread = y - alpha - beta * x
self.spread = pd.Series(spread, index=df.index)
# ADF 检验
adf_result = adfuller(spread, maxlag=1, regression='c')
self.hedge_ratio = beta
is_cointegrated = adf_result[1] < significance_level
self.cointegration_results = {
'is_cointegrated': is_cointegrated,
'hedge_ratio': beta,
'alpha': alpha,
'p_value': adf_result[1],
'test_statistic': adf_result[0],
'critical_values': adf_result[4]
}
return is_cointegrated, beta, adf_result[1]
def generate_signals(self, entry_threshold=2.0, exit_threshold=0.5,
stop_loss_threshold=4.0, lookback=20):
"""
生成交易信号
参数:
entry_threshold: 入场 Z-score
exit_threshold: 出场 Z-score
stop_loss_threshold: 止损 Z-score
lookback: 滚动窗口
"""
if self.spread is None:
raise ValueError("请先运行协整检验")
spread = self.spread
spread_mean = spread.rolling(window=lookback).mean()
spread_std = spread.rolling(window=lookback).std()
z_score = (spread - spread_mean) / spread_std
self.signals = pd.DataFrame(index=spread.index)
self.signals['spread'] = spread
self.signals['z_score'] = z_score
self.signals['position'] = 0
current_position = 0
for i in range(lookback, len(spread)):
z = z_score.iloc[i]
if current_position == 0:
if z < -entry_threshold:
current_position = 1 # 做多价差
elif z > entry_threshold:
current_position = -1 # 做空价差
elif current_position == 1:
if z > -exit_threshold or z < -stop_loss_threshold:
current_position = 0
elif current_position == -1:
if z < exit_threshold or z > stop_loss_threshold:
current_position = 0
self.signals.iloc[i, self.signals.columns.get_loc('position')] = current_position
return self.signals
def backtest(self, transaction_cost=0.001):
"""
回测策略
参数:
transaction_cost: 双边交易成本(比例)
返回:
returns: 策略收益序列
"""
if self.signals is None:
raise ValueError("请先生成信号")
# 计算价格收益
returns1 = self.prices1.pct_change()
returns2 = self.prices2.pct_change()
# 对齐
df = pd.DataFrame({
'position': self.signals['position'],
'ret1': returns1,
'ret2': returns2
}).dropna()
# 计算策略收益
# 做多价差 = 做多 stock1 + 做空 hedge_ratio * stock2
df['strategy_return'] = (
df['position'] * df['ret1'] -
df['position'] * self.hedge_ratio * df['ret2']
)
# 计算交易成本
df['position_change'] = df['position'].diff().abs()
df['transaction_cost'] = df['position_change'] * transaction_cost
# 扣除成本后的收益
df['net_return'] = df['strategy_return'] - df['transaction_cost']
self.returns = df['net_return']
# 计算绩效指标
self._calculate_performance()
return self.returns
def _calculate_performance(self):
"""计算绩效指标"""
if self.returns is None:
return
returns = self.returns
# 基本指标
total_return = (1 + returns).prod() - 1
annual_return = (1 + returns).mean() * 252
annual_vol = returns.std() * np.sqrt(252)
sharpe_ratio = annual_return / annual_vol if annual_vol > 0 else 0
# 最大回撤
cumulative = (1 + returns).cumprod()
running_max = cumulative.expanding().max()
drawdown = (cumulative - running_max) / running_max
max_drawdown = drawdown.min()
# 胜率
winning_days = (returns > 0).sum()
total_days = (returns != 0).sum()
win_rate = winning_days / total_days if total_days > 0 else 0
# 交易次数
trades = self.signals['position'].diff().abs().sum() / 2
self.performance = {
'total_return': total_return,
'annual_return': annual_return,
'annual_volatility': annual_vol,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': max_drawdown,
'win_rate': win_rate,
'total_trades': trades
}
def plot_results(self):
"""可视化回测结果"""
if self.signals is None or self.returns is None:
raise ValueError("请先运行回测")
fig, axes = plt.subplots(3, 1, figsize=(14, 10))
# 1. 价格和协整关系
ax1 = axes[0]
ax1.plot(self.prices1.index, self.prices1.values,
label=self.stock1_name, alpha=0.7)
ax1.plot(self.prices2.index,
self.cointegration_results['alpha'] +
self.hedge_ratio * self.prices2.values,
label=f"{self.stock2_name} (adjusted)", alpha=0.7)
ax1.set_title('价格和协整关系')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 2. 价差和信号
ax2 = axes[1]
ax2.plot(self.signals.index, self.signals['spread'].values,
label='Spread', color='steelblue')
ax2.fill_between(self.signals.index,
self.signals['spread'].mean() - 2*self.signals['spread'].std(),
self.signals['spread'].mean() + 2*self.signals['spread'].std(),
alpha=0.1, color='green', label='±2σ')
long_positions = self.signals['position'] == 1
short_positions = self.signals['position'] == -1
ax2.scatter(self.signals.index[long_positions],
self.signals['spread'].values[long_positions],
color='green', marker='^', s=30, label='Long Spread', zorder=5)
ax2.scatter(self.signals.index[short_positions],
self.signals['spread'].values[short_positions],
color='red', marker='v', s=30, label='Short Spread', zorder=5)
ax2.set_title('价差和交易信号')
ax2.legend()
ax2.grid(True, alpha=0.3)
# 3. 累计收益
ax3 = axes[2]
cumulative_returns = (1 + self.returns).cumprod()
ax3.plot(cumulative_returns.index, cumulative_returns.values,
label='Strategy', linewidth=2, color='darkblue')
ax3.axhline(y=1, color='black', linestyle='--', alpha=0.5)
ax3.set_title('累计收益')
ax3.set_ylabel('Cumulative Return')
ax3.grid(True, alpha=0.3)
# 添加绩效信息
perf_text = f"""
总收益: {self.performance['total_return']:.2%}
年化收益: {self.performance['annual_return']:.2%}
夏普比率: {self.performance['sharpe_ratio']:.2f}
最大回撤: {self.performance['max_drawdown']:.2%}
胜率: {self.performance['win_rate']:.1%}
交易次数: {self.performance['total_trades']:.0f}
"""
ax3.text(0.02, 0.1, perf_text, transform=ax3.transAxes,
fontsize=10, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
plt.tight_layout()
plt.show()
return fig
def print_summary(self):
"""打印策略摘要"""
print("=" * 60)
print("配对交易策略回测报告")
print("=" * 60)
print(f"\n配对: {self.stock1_name} vs {self.stock2_name}")
print(f"对冲比率: {self.hedge_ratio:.4f}")
print(f"\n协整检验:")
print(f" 状态: {'✅ 协整' if self.cointegration_results['is_cointegrated'] else '❌ 不协整'}")
print(f" p 值: {self.cointegration_results['p_value']:.6f}")
print(f"\n绩效指标:")
for key, value in self.performance.items():
if isinstance(value, float):
if 'rate' in key or 'return' in key or 'drawdown' in key:
print(f" {key}: {value:.2%}")
else:
print(f" {key}: {value:.4f}")
else:
print(f" {key}: {value}")
print("=" * 60)
# ============ 使用示例 ============
# 创建系统实例
system = PairsTradingSystem('Stock_A', 'Stock_B')
# 生成协整数据
prices1, prices2 = system.generate_cointegrated_data(
n_periods=500,
beta=1.5,
alpha=10,
noise_std=3,
seed=42
)
# 协整检验
is_cointegrated, hedge_ratio, p_value = system.test_cointegration()
print(f"协整检验: {'通过' if is_cointegrated else '未通过'}, p 值: {p_value:.6f}")
# 生成信号
system.generate_signals(entry_threshold=2.0, exit_threshold=0.5, lookback=20)
# 回测
returns = system.backtest(transaction_cost=0.001)
# 可视化和报告
system.plot_results()
system.print_summary()六、多配对组合
实际交易中会同时交易多个配对以分散风险。
class MultiPairsPortfolio:
"""
多配对组合管理
功能:
1. 管理多个配对
2. 资金分配
3. 风险控制
"""
def __init__(self, initial_capital=1000000):
self.initial_capital = initial_capital
self.pairs = []
self.portfolio_returns = None
def add_pair(self, stock1, stock2, hedge_ratio, spread, signals):
"""添加一个配对"""
pair = {
'stock1': stock1,
'stock2': stock2,
'hedge_ratio': hedge_ratio,
'spread': spread,
'signals': signals
}
self.pairs.append(pair)
def calculate_portfolio_returns(self, prices_dict, weights=None):
"""
计算组合收益
参数:
prices_dict: 价格字典 {stock_name: price_series}
weights: 各配对权重(默认等权)
"""
if not self.pairs:
raise ValueError("没有添加任何配对")
n_pairs = len(self.pairs)
if weights is None:
weights = np.ones(n_pairs) / n_pairs
if len(weights) != n_pairs:
raise ValueError("权重数量与配对数量不匹配")
# 计算每个配对的收益
pair_returns = []
for pair in self.pairs:
s1 = pair['stock1']
s2 = pair['stock2']
hedge = pair['hedge_ratio']
position = pair['signals']['position']
ret1 = prices_dict[s1].pct_change()
ret2 = prices_dict[s2].pct_change()
# 配对收益
pair_ret = position.shift(1) * ret1 - position.shift(1) * hedge * ret2
pair_returns.append(pair_ret)
# 组合收益
returns_df = pd.DataFrame(pair_returns).T
self.portfolio_returns = returns_df.mul(weights, axis=1).sum(axis=1)
return self.portfolio_returns
def calculate_performance(self):
"""计算组合绩效"""
if self.portfolio_returns is None:
return None
returns = self.portfolio_returns.dropna()
metrics = {
'total_return': (1 + returns).prod() - 1,
'annual_return': (1 + returns).mean() * 252,
'annual_vol': returns.std() * np.sqrt(252),
'sharpe': (1 + returns).mean() * 252 / (returns.std() * np.sqrt(252)),
}
# 最大回撤
cumulative = (1 + returns).cumprod()
running_max = cumulative.expanding().max()
drawdown = (cumulative - running_max) / running_max
metrics['max_drawdown'] = drawdown.min()
return metrics七、统计套利的风险
7.1 主要风险类型
| 风险类型 | 描述 | 缓解方法 |
|---|---|---|
| 协整关系断裂 | 历史关系不再成立 | 定期重新检验,设置止损 |
| 制度转换 | 市场结构性变化 | 多元化配对,动态调整 |
| 执行成本 | 滑点、佣金、价差 | 优化交易时机,控制换手 |
| 模型风险 | 模型假设错误 | 压力测试,多个模型验证 |
| 流动性风险 | 无法按期望价格成交 | 选择流动性好的配对 |
7.2 风险管理实践
def risk_management_checks(spread, current_position, checks_config):
"""
风险管理检查
参数:
spread: 当前价差
current_position: 当前持仓
checks_config: 风险参数配置
返回:
action: 建议操作 ('hold', 'close', 'reduce')
reason: 原因说明
"""
checks = {
'max_loss': checks_config.get('max_loss', 0.10), # 最大亏损
'max_holding_days': checks_config.get('max_holding_days', 20),
'min_margin': checks_config.get('min_margin', 0.05), # 最低保证金
}
# 计算当前持仓盈亏
if hasattr(spread, 'entry_price'):
pnl = (spread - spread.entry_price) / spread.entry_price
if pnl < -checks['max_loss']:
return 'close', f'触发止损: 亏损 {pnl:.2%}'
return 'hold', '风险正常'八、ML 增强统计套利
8.1 LSTM 预测价差
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
def prepare_lstm_data(spread, lookback=20):
"""
准备 LSTM 训练数据
参数:
spread: 价差序列
lookback: 输入窗口
返回:
X, y: 训练数据
"""
data = spread.values.reshape(-1, 1)
X, y = [], []
for i in range(lookback, len(data)):
X.append(data[i-lookback:i, 0])
y.append(data[i, 0])
return np.array(X), np.array(y)
def build_lstm_model(lookback=20):
"""构建 LSTM 模型"""
model = Sequential([
LSTM(50, activation='relu', input_shape=(lookback, 1)),
Dense(1)
])
model.compile(optimizer='adam', loss='mse')
return model
# 使用示例(需要安装 tensorflow)
"""
# 准备数据
X, y = prepare_lstm_data(spread_series, lookback=20)
# 划分训练集
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]
# 构建和训练模型
model = build_lstm_model(lookback=20)
model.fit(X_train, y_train, epochs=50, batch_size=32, verbose=0)
# 预测
predictions = model.predict(X_test)
# 基于预测生成信号
# 如果预测价差会上涨 → 做多价差
# 如果预测价差会下跌 → 做空价差
"""8.2 动态对冲比率
传统方法使用固定对冲比率,ML 可以实现动态调整:
def dynamic_hedge_ratio(price1, price2, window=60):
"""
动态对冲比率
使用滚动窗口回归,实时更新对冲比率
参数:
price1, price2: 价格序列
window: 回归窗口
返回:
hedge_ratios: 动态对冲比率序列
"""
hedge_ratios = pd.Series(index=price1.index, dtype=float)
for i in range(window, len(price1)):
y = price1.iloc[i-window:i].values
x = price2.iloc[i-window:i].values
# OLS 回归
slope = np.cov(x, y)[0, 1] / np.var(x)
hedge_ratios.iloc[i] = slope
return hedge_ratios核心知识点总结
协整检验
- Engle-Granger 两步法:适合两变量
- Johansen 检验:适合多变量
- 平稳性检验:ADF 检验验证残差平稳
配对选择
- 距离法:价格走势相似度
- 协整法:直接检验协整关系
- 聚类法:基于特征分组
信号生成
- Z-score:标准化价差
- Bollinger Band:均值 ± 标准差带
- 阈值设置:入场、出场、止损
风险管理
- 协整关系断裂风险
- 执行成本控制
- 多配对分散风险
ML 增强
- LSTM 预测价差方向
- 动态对冲比率
- 聚类发现配对
下一步
继续学习 03-信号到组合.md,了解如何将 alpha 信号转化为实际可交易的投资组合。