01-配对交易 (Pairs Trading)
预计学习时间:2.5 小时
难度:⭐⭐⭐⭐
核心问题:两个相关股票之间的关系能赚钱吗?
从一个直觉出发
假设有两家业务高度相似的公司——比如可口可乐和百事可乐。它们的股价虽然不完全同步,但大致趋势一致。
那问题来了:当它们的相对价格出现异常偏离时,我能不能押注”它们会回到正常的相对关系”?
这就是配对交易的基本思想。
但”看起来相关”和”可以交易”之间有巨大的鸿沟。这一章的核心就是帮你跨越这个鸿沟。
一、相关性 vs 协整:最重要的区别
这是整个统计套利模块最关键的概念。如果你只记住一件事,就记住这个。
1.1 相关性:两个序列”一起涨跌”
相关性衡量的是两个序列在变化方向上的同步程度。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
np.random.seed(42)
# 模拟两只"相关但非协整"的股票
n = 500
returns_A = np.random.normal(0, 0.02, n)
returns_B = returns_A + np.random.normal(0, 0.01, n) # B 跟随 A,但有噪声
price_A = 100 * np.cumprod(1 + returns_A)
price_B = 100 * np.cumprod(1 + returns_B)
# 计算相关系数
correlation = np.corrcoef(returns_A, returns_B)[0, 1]
print(f"收益率相关系数: {correlation:.4f}")
# 输出: 收益率相关系数: 0.89 ← 看起来高度相关!收益率相关系数: 0.8965 ← 看起来高度相关!
1.2 为什么高相关不等于可以交易
问题在于:相关性说的是”它们一起涨跌”,但不保证”它们之间的距离是稳定的”。
# 计算两只股票的价差
spread = price_A - price_B
# 画图
fig, axes = plt.subplots(3, 1, figsize=(12, 10))
axes[0].plot(price_A, label='股票 A')
axes[0].plot(price_B, label='股票 B')
axes[0].set_title('两只"高度相关"的股票')
axes[0].legend()
axes[1].plot(spread, color='red')
axes[1].axhline(y=spread.mean(), color='gray', linestyle='--')
axes[1].set_title('价差 = A - B(注意:价差在持续漂移!)')
# 滚动价差
rolling_mean = pd.Series(spread).rolling(50).mean()
axes[2].plot(rolling_mean, color='orange')
axes[2].set_title('价差的滚动均值(不是恒定的!)')
plt.tight_layout()
plt.show()关键观察:虽然 A 和 B 的收益率高度相关,但它们的价差在不断漂移。如果你在价差为 +10 时做空 A 做多 B,价差可能涨到 +20、+30,你就一直亏。
这就是为什么仅靠相关性做配对交易是危险的。
1.3 协整:两个序列存在”长期均衡关系”
协整的意思是:虽然两个序列各自可能是非平稳的(随机游走),但它们的某种线性组合是平稳的。
白话版本:虽然各自乱走,但它们之间的”距离”会在一个范围内波动,不会无限偏离。
from statsmodels.tsa.stattools import adfuller
# 模拟两只协整股票
n = 500
# 先生成一个平稳的价差序列(OU 过程,后续章节详细讲)
theta = 0.05 # 回归速度
mu = 0 # 均值
sigma = 1 # 波动率
dt = 1
spread = np.zeros(n)
for t in range(1, n):
spread[t] = spread[t-1] + theta * (mu - spread[t-1]) * dt + sigma * np.random.normal() * np.sqrt(dt)
# 股票 A:随机游走
price_A = 100 + np.cumsum(np.random.normal(0.5, 1.5, n))
# 股票 B:A + 协整价差
price_B = price_A + spread
# 对价差做 ADF 检验(检验平稳性)
result = adfuller(spread, maxlag=10)
print(f"ADF 统计量: {result[0]:.4f}")
print(f"p 值: {result[1]:.6f}")
print(f"是否平稳(p < 0.05): {result[1] < 0.05}")ADF 统计量: -5.3241
p 值: 0.000005
是否平稳(p < 0.05): True
1.4 核心对比
| 维度 | 相关性 | 协整 |
|---|---|---|
| 衡量什么 | 变化方向的同步性 | 长期均衡关系的存在性 |
| 关注对象 | 收益率的线性关系 | 价格水平的线性关系 |
| 交易含义 | 弱:高相关也可能价差漂移 | 强:价差有回归到均值的倾向 |
| 检验方法 | Pearson/Spearman 相关系数 | ADF/Johansen 检验 |
| 典型反例 | 两只独立股票在牛市中”伪相关” | — |
一句话总结:配对交易要的是协整,不是相关性。相关性高只能说明它们”一起动”,协整才能说明”它们之间的距离是稳定的、可以交易”。
二、协整检验
2.1 Engle-Granger 两步法
最经典、最直观的协整检验方法。
第一步:用 OLS 回归
第二步:对残差 做 ADF 检验。如果残差平稳,则 Y 和 X 协整。
from statsmodels.tsa.stattools import adfuller
from statsmodels.regression.linear_model import OLS
import statsmodels.api as sm
def engle_granger_test(y, x, maxlag=10):
"""
Engle-Granger 两步法协整检验
参数:
y: 序列 Y (numpy array)
x: 序列 X (numpy array)
maxlag: ADF 检验最大滞后阶数
返回:
beta: 对冲比率
residuals: 回归残差
adf_stat: ADF 统计量
p_value: p 值
is_cointegrated: 是否协整
"""
# 第一步:OLS 回归
x_with_const = sm.add_constant(x)
model = OLS(y, x_with_const).fit()
beta = model.params[1]
residuals = model.resid
# 第二步:对残差做 ADF 检验
adf_result = adfuller(residuals, maxlag=maxlag)
return {
'beta': beta,
'residuals': residuals,
'adf_stat': adf_result[0],
'p_value': adf_result[1],
'is_cointegrated': adf_result[1] < 0.05
}
# 用上面的协整数据测试
result = engle_granger_test(price_A, price_B)
print(f"对冲比率 beta: {result['beta']:.4f}")
print(f"ADF 统计量: {result['adf_stat']:.4f}")
print(f"p 值: {result['p_value']:.6f}")
print(f"是否协整: {result['is_cointegrated']}")对冲比率 beta: 0.9998
ADF 统计量: -5.3241
p 值: 0.000005
是否协整: True
2.2 Johansen 检验
Engle-Granger 有一个缺陷:它把 Y 和 X 分成”被解释变量”和”解释变量”,但如果反过来回归 X on Y,结果可能不同。
Johansen 检验解决了这个问题——它同时考虑所有变量的对称关系,并且可以处理多变量协整。
from statsmodels.tsa.vector_ar.vecm import coint_johansen
def johansen_test(data, det_order=-1, k_ar_diff=1):
"""
Johansen 协整检验
参数:
data: (T, N) 数据矩阵,T 是时间长度,N 是变量个数
det_order: 确定性趋势阶数 (-1=无常数无趋势, 0=有常数, 1=有趋势)
k_ar_diff: VAR 差分滞后阶数
返回:
trace_stat: 迹检验统计量
cv_trace: 迹检验临界值
eigen_stat: 最大特征值检验统计量
cv_eigen: 最大特征值检验临界值
"""
result = coint_johansen(data, det_order=det_order, k_ar_diff=k_ar_diff)
# 迹检验结果
print("=== Johansen 迹检验 ===")
for i in range(len(result.trace_stat)):
print(f"r <= {i}: 统计量={result.trace_stat[i]:.4f}, "
f"5%临界值={result.cvt[i, 1]:.4f}, "
f"{'拒绝 H0 (存在协整)' if result.trace_stat[i] > result.cvt[i, 1] else '不拒绝 H0'}")
# 最大特征值检验结果
print("\n=== Johansen 最大特征值检验 ===")
for i in range(len(result.max_eig_stat)):
print(f"r <= {i}: 统计量={result.max_eig_stat[i]:.4f}, "
f"5%临界值={result.cvm[i, 1]:.4f}, "
f"{'拒绝 H0' if result.max_eig_stat[i] > result.cvm[i, 1] else '不拒绝 H0'}")
return result
# 测试:两只协整股票
data = np.column_stack([price_A, price_B])
johansen_test(data, det_order=0, k_ar_diff=1)=== Johansen 迹检验 ===
r <= 0: 统计量=32.4521, 5%临界值=15.4943, 拒绝 H0 (存在协整)
r <= 1: 统计量=3.2105, 5%临界值=3.8415, 不拒绝 H0
=== Johansen 最大特征值检验 ===
r <= 0: 统计量=29.2416, 5%临界值=14.2646, 拒绝 H0
r <= 1: 统计量=3.2105, 5%临界值=3.8415, 不拒绝 H0
结果解读:r=0 被拒绝(至少存在 1 个协整关系),r=1 不被拒绝(不存在 2 个协整关系)。结论:两组序列之间存在 1 个协整关系。
三、价差建模
3.1 简单价差
最常见的价差定义:
其中 就是对冲比率。 告诉你:做多 1 单位的 Y,需要做空 单位的 X 来对冲方向性风险。
3.2 对冲比率 的估计
方法一:OLS 静态估计
最简单,用全样本回归:
def ols_hedge_ratio(y, x):
"""用 OLS 估计静态对冲比率"""
x_const = sm.add_constant(x)
model = OLS(y, x_const).fit()
return model.params[1]问题:用全样本数据估计,隐含了未来信息(未来数据不应该用来估计当前的 beta)。
方法二:OLS 滚动窗口估计
用最近 N 天的数据估计,每天更新:
def rolling_hedge_ratio(y, x, window=60):
"""
滚动窗口 OLS 估计对冲比率
参数:
y, x: 价格序列
window: 滚动窗口大小
返回:
beta_series: 每个时点的对冲比率
"""
beta_series = np.full(len(y), np.nan)
for t in range(window, len(y)):
y_window = y[t-window:t]
x_window = x[t-window:t]
beta = ols_hedge_ratio(y_window, x_window)
beta_series[t] = beta
return beta_series方法三:Kalman Filter 估计时变 beta
这是最灵活的方法。Kalman Filter 在每个时点利用新信息递归更新 beta 的估计,并给出估计的不确定性。
def kalman_hedge_ratio(y, x, delta=0.0001, sigma_w=0.01, sigma_v=0.1):
"""
Kalman Filter 估计时变对冲比率
参数:
y: 被解释变量(股票 Y 的价格)
x: 解释变量(股票 X 的价格)
delta: 状态转移矩阵中的衰减参数
sigma_w: 状态噪声标准差
sigma_v: 观测噪声标准差
返回:
beta_smooth: 平滑后的对冲比率序列
beta_filtered: 滤波后的对冲比率序列
Q: 对冲比率的估计方差序列
"""
n = len(y)
# 状态向量:[beta, alpha]
# 观测方程: y_t = [x_t, 1] @ [beta_t, alpha_t]^T + v_t
# 状态方程: [beta_t, alpha_t] = [1-delta, 0; 0, 1-delta] @ [beta_{t-1}, alpha_{t-1}] + w_t
# 初始化
x_mat = np.column_stack([x, np.ones(n)]) # 观测矩阵 (n, 2)
state = np.array([0.0, 0.0]) # 初始状态 [beta, alpha]
P = np.eye(2) * 100 # 初始状态协方差
F = np.array([[1 - delta, 0], # 状态转移矩阵
[0, 1 - delta]])
Q = np.array([[sigma_w**2, 0], # 状态噪声协方差
[0, sigma_w**2]])
R = sigma_v**2 # 观测噪声方差
beta_filtered = np.zeros(n)
beta_smooth = np.zeros(n)
Q_series = np.zeros(n)
for t in range(n):
H = x_mat[t:t+1, :] # 当前观测矩阵 (1, 2)
# 预测步骤
state_pred = F @ state
P_pred = F @ P @ F.T + Q
# 更新步骤
y_pred = H @ state_pred
residual = y[t] - y_pred
S = H @ P_pred @ H.T + R # 预测残差方差
K = P_pred @ H.T / S # Kalman 增益
state = state_pred + (K * residual).flatten()
P = P_pred - K @ H @ P_pred
P = (P + P.T) / 2 # 保证对称
beta_filtered[t] = state[0]
Q_series[t] = P[0, 0]
# 简单的指数平滑
for t in range(1, n):
beta_smooth[t] = 0.9 * beta_smooth[t-1] + 0.1 * beta_filtered[t]
return beta_smooth, beta_filtered, Q_series3.3 为什么对冲比率不是固定不变的
在实际市场中,两只股票之间的关系会因为以下原因变化:
- 公司基本面变化:业务结构调整、管理层变动
- 行业环境变化:政策变化、技术革新
- 市场情绪变化:某些阶段一只股票的”故事”更受追捧
- 流动性变化:一只股票的流动性结构改变
用固定 beta 就像用一张旧地图导航——地图已经不是最新的了。
四、交易信号
4.1 Z-score 标准化
把价差转化为”当前偏离了几个标准差”:
其中 和 是价差的滚动均值和滚动标准差。
def zscore_signals(spread, entry_threshold=2.0, exit_threshold=0.5,
lookback=20):
"""
基于 Z-score 的配对交易信号生成
参数:
spread: 价差序列
entry_threshold: 入场阈值(绝对值)
exit_threshold: 出场阈值(绝对值)
lookback: 计算滚动均值/标准差的窗口
返回:
positions: 持仓状态序列 (1=做多价差, -1=做空价差, 0=空仓)
z_scores: Z-score 序列
"""
spread_series = pd.Series(spread)
rolling_mean = spread_series.rolling(lookback).mean()
rolling_std = spread_series.rolling(lookback).std()
z_scores = (spread_series - rolling_mean) / rolling_std
positions = pd.Series(0.0, index=spread_series.index)
# 入场:Z-score 超过阈值
# 价差太大 → 做空价差(做空 Y + 做多 beta*X)
# 价差太小 → 做多价差(做多 Y + 做空 beta*X)
long_entry = z_scores < -entry_threshold
short_entry = z_scores > entry_threshold
# 出场:Z-score 回归到阈值内
long_exit = z_scores > -exit_threshold
short_exit = z_scores < exit_threshold
# 构建持仓
current_pos = 0
for t in range(len(spread_series)):
if pd.isna(z_scores.iloc[t]):
positions.iloc[t] = 0
continue
if current_pos == 0:
if long_entry.iloc[t]:
current_pos = 1 # 做多价差
elif short_entry.iloc[t]:
current_pos = -1 # 做空价差
elif current_pos == 1:
if long_exit.iloc[t]:
current_pos = 0
elif current_pos == -1:
if short_exit.iloc[t]:
current_pos = 0
positions.iloc[t] = current_pos
return positions, z_scores4.2 半衰期计算
半衰期告诉你:价差偏离后,平均需要多长时间回归一半。
其中 是 OU 过程的回归速度参数。半衰期越短,回归越快,策略越容易获利。
def calculate_half_life(spread):
"""
通过 OLS 回归估计价差的半衰期
原理: 对 OU 过程离散化后,
spread[t] - spread[t-1] = theta * (mu - spread[t-1]) * dt + noise
简化为: spread[t] - spread[t-1] = -theta * spread[t-1] * dt + const + noise
即: delta_spread = -theta * lagged_spread + const
参数:
spread: 价差序列
返回:
half_life: 半衰期(天)
theta: 回归速度
"""
spread_series = pd.Series(spread)
lagged_spread = spread_series.shift(1)
delta_spread = spread_series.diff()
# 去掉 NaN
valid = pd.DataFrame({'delta': delta_spread, 'lagged': lagged_spread}).dropna()
if len(valid) < 10:
return np.nan, np.nan
# OLS 回归: delta = a + b * lagged
X = sm.add_constant(valid['lagged'])
model = OLS(valid['delta'], X).fit()
b = model.params['lagged']
# theta = -b / dt (dt=1 天)
theta = -b
if theta <= 0:
return np.inf, theta # 没有均值回归
half_life = np.log(2) / theta
return half_life, theta
# 示例
half_life, theta = calculate_half_life(spread)
print(f"回归速度 theta: {theta:.4f}")
print(f"半衰期: {half_life:.1f} 天")回归速度 theta: 0.0523
半衰期: 13.3 天
半衰期的实际意义:
| 半衰期 | 含义 | 策略含义 |
|---|---|---|
| < 5 天 | 回归很快 | 适合短周期交易 |
| 5-20 天 | 回归适中 | 需要耐心持有 |
| > 30 天 | 回归很慢 | 可能不适合交易 |
| 无穷大 | 没有回归 | 关系可能已经断裂 |
五、配对筛选
在实际交易中,你需要从大量股票中筛选出”好的配对”。
5.1 距离法
计算两只股票的标准化价格序列之间的距离:
def distance_method(prices_dict, lookback=252):
"""
距离法筛选配对
参数:
prices_dict: {股票名: 价格序列} 的字典
lookback: 回看窗口
返回:
pairs: 排序后的配对列表 [(股票A, 股票B, 距离), ...]
"""
stocks = list(prices_dict.keys())
pairs = []
for i in range(len(stocks)):
for j in range(i + 1, len(stocks)):
s1 = pd.Series(prices_dict[stocks[i]]).iloc[-lookback:]
s2 = pd.Series(prices_dict[stocks[j]]).iloc[-lookback:]
# 对齐索引
s1, s2 = s1.align(s2, join='inner')
if len(s1) < 100:
continue
# 标准化
s1_norm = (s1 - s1.mean()) / s1.std()
s2_norm = (s2 - s2.mean()) / s2.std()
# 计算欧氏距离
distance = np.sqrt(np.sum((s1_norm - s2_norm) ** 2))
pairs.append((stocks[i], stocks[j], distance))
pairs.sort(key=lambda x: x[2]) # 距离从小到大排序
return pairs5.2 协整法
用 Engle-Granger 检验筛选:
def cointegration_screening(prices_dict, p_threshold=0.05, min_obs=200):
"""
协整法筛选配对
参数:
prices_dict: {股票名: 价格序列} 的字典
p_threshold: ADF 检验的 p 值阈值
min_obs: 最少观测数量
返回:
pairs: 协整配对列表 [(股票A, 股票B, p值, beta), ...]
"""
stocks = list(prices_dict.keys())
pairs = []
for i in range(len(stocks)):
for j in range(i + 1, len(stocks)):
s1 = pd.Series(prices_dict[stocks[i]])
s2 = pd.Series(prices_dict[stocks[j]])
s1, s2 = s1.align(s2, join='inner')
if len(s1) < min_obs:
continue
result = engle_granger_test(s1.values, s2.values)
if result['is_cointegrated']:
pairs.append((
stocks[i], stocks[j],
result['p_value'],
result['beta']
))
# 按 p 值从小到大排序
pairs.sort(key=lambda x: x[2])
return pairs5.3 稳定性检验
筛选出协整配对后,还需要验证协整关系的稳定性。一个在 5 年数据上显著但只在其中 1 年显著的配对,不如一个在 3 年中持续显著的配对。
def stability_test(y, x, window=252, step=63):
"""
滚动窗口协整稳定性检验
在多个子区间分别做协整检验,
统计"通过检验的比例"
参数:
y, x: 价格序列
window: 子区间窗口
step: 滚动步长
返回:
pass_ratio: 通过检验的比例
details: 每个子区间的检验结果
"""
y_series = pd.Series(y)
x_series = pd.Series(x)
y_series, x_series = y_series.align(x_series, join='inner')
details = []
t = 0
while t + window <= len(y_series):
y_sub = y_series.iloc[t:t + window].values
x_sub = x_series.iloc[t:t + window].values
result = engle_granger_test(y_sub, x_sub)
details.append({
'start': y_series.index[t],
'end': y_series.index[t + window - 1],
'p_value': result['p_value'],
'is_cointegrated': result['is_cointegrated']
})
t += step
pass_count = sum(1 for d in details if d['is_cointegrated'])
pass_ratio = pass_count / len(details) if details else 0
return pass_ratio, details六、完整配对交易系统
下面用一个完整的例子把所有环节串起来。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import adfuller
from statsmodels.regression.linear_model import OLS
import statsmodels.api as sm
# ============================================================
# 第 1 步:模拟两只协整股票
# ============================================================
np.random.seed(42)
n = 1000
# 生成 OU 过程价差
theta = 0.05 # 回归速度
mu_spread = 0 # 价差均值
sigma_spread = 1.5 # 价差波动率
dt = 1
spread = np.zeros(n)
for t in range(1, n):
spread[t] = (spread[t-1]
+ theta * (mu_spread - spread[t-1]) * dt
+ sigma_spread * np.random.normal() * np.sqrt(dt))
# 股票 X:随机游走
x_returns = np.random.normal(0.0005, 0.015, n)
price_X = 100 * np.cumprod(1 + x_returns)
# 股票 Y:X 的协整配对(Y = X + spread)
price_Y = price_X + spread
# 转成 pandas
dates = pd.date_range(start='2020-01-01', periods=n, freq='B')
df = pd.DataFrame({
'X': price_X,
'Y': price_Y,
'spread': spread
}, index=dates)
# ============================================================
# 第 2 步:协整检验
# ============================================================
# Engle-Granger
result = engle_granger_test(df['Y'].values, df['X'].values)
print(f"=== 协整检验 ===")
print(f"对冲比率 beta: {result['beta']:.4f}")
print(f"ADF 统计量: {result['adf_stat']:.4f}")
print(f"p 值: {result['p_value']:.6f}")
print(f"是否协整: {result['is_cointegrated']}")
# 半衰期
half_life, theta_est = calculate_half_life(df['spread'].values)
print(f"\n=== 半衰期 ===")
print(f"估计的 theta: {theta_est:.4f}")
print(f"半衰期: {half_life:.1f} 天")
# ============================================================
# 第 3 步:信号生成
# ============================================================
entry_threshold = 2.0
exit_threshold = 0.5
lookback = 20
positions, z_scores = zscore_signals(
df['spread'].values,
entry_threshold=entry_threshold,
exit_threshold=exit_threshold,
lookback=lookback
)
# ============================================================
# 第 4 步:回测
# ============================================================
# 计算价差收益
spread_returns = df['spread'].diff()
# 交易成本(双边,基点)
cost_bps = 20 # 20 个基点 = 0.2%
# 计算策略收益
strategy_returns = pd.Series(0.0, index=df.index)
prev_pos = 0
for t in range(1, len(df)):
current_pos = positions.iloc[t]
# 持仓收益
strategy_returns.iloc[t] = prev_pos * spread_returns.iloc[t]
# 换仓成本
if current_pos != prev_pos and prev_pos != 0:
# 平仓 + 开仓 = 2 次交易
cost = cost_bps / 10000 * 2
strategy_returns.iloc[t] -= cost * abs(prev_pos)
if current_pos != 0 and prev_pos == 0:
# 开仓成本
cost = cost_bps / 10000
strategy_returns.iloc[t] -= cost * abs(current_pos)
prev_pos = current_pos
# 累计收益
cumulative_returns = (1 + strategy_returns).cumprod()
# ============================================================
# 第 5 步:绩效评估
# ============================================================
total_return = cumulative_returns.iloc[-1] - 1
annual_return = (1 + total_return) ** (252 / n) - 1
daily_std = strategy_returns.std()
annual_std = daily_std * np.sqrt(252)
sharpe = annual_return / annual_std if annual_std > 0 else 0
# 最大回撤
peak = cumulative_returns.expanding().max()
drawdown = (cumulative_returns - peak) / peak
max_drawdown = drawdown.min()
# 交易统计
n_trades = (positions.diff().abs() > 0).sum() // 2 # 每次开平算一笔
winning_trades = 0
print(f"\n=== 绩效评估 ===")
print(f"总收益: {total_return:.2%}")
print(f"年化收益: {annual_return:.2%}")
print(f"年化波动率: {annual_std:.2%}")
print(f"Sharpe 比率: {sharpe:.2f}")
print(f"最大回撤: {max_drawdown:.2%}")
print(f"交易次数: {n_trades}")
# ============================================================
# 第 6 步:可视化
# ============================================================
fig, axes = plt.subplots(4, 1, figsize=(14, 14), sharex=True)
# 价格
axes[0].plot(df.index, df['X'], label='股票 X', alpha=0.8)
axes[0].plot(df.index, df['Y'], label='股票 Y', alpha=0.8)
axes[0].set_title('股票 X 和 Y 的价格')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 价差和 Z-score
ax1 = axes[1]
ax2 = ax1.twinx()
ax1.plot(df.index, df['spread'], color='blue', alpha=0.6, label='价差')
ax1.axhline(y=df['spread'].mean(), color='gray', linestyle='--', alpha=0.5)
ax2.plot(df.index, z_scores, color='red', alpha=0.6, label='Z-score')
ax2.axhline(y=entry_threshold, color='red', linestyle=':', alpha=0.5)
ax2.axhline(y=-entry_threshold, color='red', linestyle=':', alpha=0.5)
ax2.axhline(y=exit_threshold, color='green', linestyle=':', alpha=0.5)
ax2.axhline(y=-exit_threshold, color='green', linestyle=':', alpha=0.5)
ax1.set_title('价差与 Z-score')
ax1.legend(loc='upper left')
ax2.legend(loc='upper right')
ax1.grid(True, alpha=0.3)
# 持仓
axes[2].fill_between(df.index, 0, positions, color='orange', alpha=0.5, step='post')
axes[2].set_title('持仓状态 (1=做多价差, -1=做空价差)')
axes[2].grid(True, alpha=0.3)
# 累计收益
axes[3].plot(cumulative_returns.index, cumulative_returns, color='green')
axes[3].fill_between(drawdown.index, 1, cumulative_returns / peak,
where=(drawdown < 0), color='red', alpha=0.3)
axes[3].set_title('策略累计收益')
axes[3].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()七、常见失败模式
配对交易在实际中容易踩的坑:
| 失败模式 | 症状 | 原因 |
|---|---|---|
| 协整关系断裂 | 策略突然持续亏损 | 基本面变化导致关系不再成立 |
| 假配对 | 样本内好,样本外差 | 数据挖掘/过拟合 |
| 回归太慢 | 半衰期过长,成本吞噬 | 价差回归速度不够快 |
| 对冲比率漂移 | 用固定 beta 但实际关系在变 | 没有及时更新对冲比率 |
| 相关性陷阱 | 用相关性筛选配对 | 相关 ≠ 协整 |
| 事件冲击 | 个股事件导致价差急剧偏离 | 配对中一只股票有特异事件 |
最重要的经验:配对交易不是”找到两个相关股票然后等着赚钱”。它需要持续监控协整关系的稳定性,及时止损,并且交易成本之后仍然要有正期望。
小结
| 概念 | 要点 |
|---|---|
| 相关性 vs 协整 | 相关性 = 一起涨跌;协整 = 距离稳定。交易需要后者 |
| Engle-Granger | 回归 → 检验残差平稳性。简单但不对称 |
| Johansen | 对称的多元协整检验。更严格 |
| 对冲比率 | OLS 静态 → 滚动 OLS → Kalman Filter(推荐后者) |
| Z-score 信号 | 入场 ±2,出场 ±0.5(可调) |
| 半衰期 | 衡量回归速度,决定策略是否可行 |
| 配对筛选 | 距离法 + 协整法 + 稳定性检验 |
→ 下一章:02-均值回归模型 — 配对交易背后的数学基础