01-配对交易 (Pairs Trading)

预计学习时间:2.5 小时

难度:⭐⭐⭐⭐

核心问题:两个相关股票之间的关系能赚钱吗?


从一个直觉出发

假设有两家业务高度相似的公司——比如可口可乐和百事可乐。它们的股价虽然不完全同步,但大致趋势一致。

那问题来了:当它们的相对价格出现异常偏离时,我能不能押注”它们会回到正常的相对关系”?

这就是配对交易的基本思想。

但”看起来相关”和”可以交易”之间有巨大的鸿沟。这一章的核心就是帮你跨越这个鸿沟。


一、相关性 vs 协整:最重要的区别

这是整个统计套利模块最关键的概念。如果你只记住一件事,就记住这个。

1.1 相关性:两个序列”一起涨跌”

相关性衡量的是两个序列在变化方向上的同步程度。

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
 
np.random.seed(42)
 
# 模拟两只"相关但非协整"的股票
n = 500
returns_A = np.random.normal(0, 0.02, n)
returns_B = returns_A + np.random.normal(0, 0.01, n)  # B 跟随 A,但有噪声
 
price_A = 100 * np.cumprod(1 + returns_A)
price_B = 100 * np.cumprod(1 + returns_B)
 
# 计算相关系数
correlation = np.corrcoef(returns_A, returns_B)[0, 1]
print(f"收益率相关系数: {correlation:.4f}")
# 输出: 收益率相关系数: 0.89  ← 看起来高度相关!
收益率相关系数: 0.8965  ← 看起来高度相关!

1.2 为什么高相关不等于可以交易

问题在于:相关性说的是”它们一起涨跌”,但不保证”它们之间的距离是稳定的”

# 计算两只股票的价差
spread = price_A - price_B
 
# 画图
fig, axes = plt.subplots(3, 1, figsize=(12, 10))
 
axes[0].plot(price_A, label='股票 A')
axes[0].plot(price_B, label='股票 B')
axes[0].set_title('两只"高度相关"的股票')
axes[0].legend()
 
axes[1].plot(spread, color='red')
axes[1].axhline(y=spread.mean(), color='gray', linestyle='--')
axes[1].set_title('价差 = A - B(注意:价差在持续漂移!)')
 
# 滚动价差
rolling_mean = pd.Series(spread).rolling(50).mean()
axes[2].plot(rolling_mean, color='orange')
axes[2].set_title('价差的滚动均值(不是恒定的!)')
 
plt.tight_layout()
plt.show()

关键观察:虽然 A 和 B 的收益率高度相关,但它们的价差在不断漂移。如果你在价差为 +10 时做空 A 做多 B,价差可能涨到 +20、+30,你就一直亏。

这就是为什么仅靠相关性做配对交易是危险的

1.3 协整:两个序列存在”长期均衡关系”

协整的意思是:虽然两个序列各自可能是非平稳的(随机游走),但它们的某种线性组合是平稳的

白话版本:虽然各自乱走,但它们之间的”距离”会在一个范围内波动,不会无限偏离

from statsmodels.tsa.stattools import adfuller
 
# 模拟两只协整股票
n = 500
# 先生成一个平稳的价差序列(OU 过程,后续章节详细讲)
theta = 0.05  # 回归速度
mu = 0       # 均值
sigma = 1    # 波动率
dt = 1
spread = np.zeros(n)
for t in range(1, n):
    spread[t] = spread[t-1] + theta * (mu - spread[t-1]) * dt + sigma * np.random.normal() * np.sqrt(dt)
 
# 股票 A:随机游走
price_A = 100 + np.cumsum(np.random.normal(0.5, 1.5, n))
 
# 股票 B:A + 协整价差
price_B = price_A + spread
 
# 对价差做 ADF 检验(检验平稳性)
result = adfuller(spread, maxlag=10)
print(f"ADF 统计量: {result[0]:.4f}")
print(f"p 值: {result[1]:.6f}")
print(f"是否平稳(p < 0.05): {result[1] < 0.05}")
ADF 统计量: -5.3241
p 值: 0.000005
是否平稳(p < 0.05): True

1.4 核心对比

维度相关性协整
衡量什么变化方向的同步性长期均衡关系的存在性
关注对象收益率的线性关系价格水平的线性关系
交易含义弱:高相关也可能价差漂移强:价差有回归到均值的倾向
检验方法Pearson/Spearman 相关系数ADF/Johansen 检验
典型反例两只独立股票在牛市中”伪相关”

一句话总结:配对交易要的是协整,不是相关性。相关性高只能说明它们”一起动”,协整才能说明”它们之间的距离是稳定的、可以交易”。


二、协整检验

2.1 Engle-Granger 两步法

最经典、最直观的协整检验方法。

第一步:用 OLS 回归

第二步:对残差 做 ADF 检验。如果残差平稳,则 Y 和 X 协整。

from statsmodels.tsa.stattools import adfuller
from statsmodels.regression.linear_model import OLS
import statsmodels.api as sm
 
def engle_granger_test(y, x, maxlag=10):
    """
    Engle-Granger 两步法协整检验
 
    参数:
        y: 序列 Y (numpy array)
        x: 序列 X (numpy array)
        maxlag: ADF 检验最大滞后阶数
 
    返回:
        beta: 对冲比率
        residuals: 回归残差
        adf_stat: ADF 统计量
        p_value: p 值
        is_cointegrated: 是否协整
    """
    # 第一步:OLS 回归
    x_with_const = sm.add_constant(x)
    model = OLS(y, x_with_const).fit()
    beta = model.params[1]
    residuals = model.resid
 
    # 第二步:对残差做 ADF 检验
    adf_result = adfuller(residuals, maxlag=maxlag)
 
    return {
        'beta': beta,
        'residuals': residuals,
        'adf_stat': adf_result[0],
        'p_value': adf_result[1],
        'is_cointegrated': adf_result[1] < 0.05
    }
 
# 用上面的协整数据测试
result = engle_granger_test(price_A, price_B)
print(f"对冲比率 beta: {result['beta']:.4f}")
print(f"ADF 统计量: {result['adf_stat']:.4f}")
print(f"p 值: {result['p_value']:.6f}")
print(f"是否协整: {result['is_cointegrated']}")
对冲比率 beta: 0.9998
ADF 统计量: -5.3241
p 值: 0.000005
是否协整: True

2.2 Johansen 检验

Engle-Granger 有一个缺陷:它把 Y 和 X 分成”被解释变量”和”解释变量”,但如果反过来回归 X on Y,结果可能不同。

Johansen 检验解决了这个问题——它同时考虑所有变量的对称关系,并且可以处理多变量协整。

from statsmodels.tsa.vector_ar.vecm import coint_johansen
 
def johansen_test(data, det_order=-1, k_ar_diff=1):
    """
    Johansen 协整检验
 
    参数:
        data: (T, N) 数据矩阵,T 是时间长度,N 是变量个数
        det_order: 确定性趋势阶数 (-1=无常数无趋势, 0=有常数, 1=有趋势)
        k_ar_diff: VAR 差分滞后阶数
 
    返回:
        trace_stat: 迹检验统计量
        cv_trace: 迹检验临界值
        eigen_stat: 最大特征值检验统计量
        cv_eigen: 最大特征值检验临界值
    """
    result = coint_johansen(data, det_order=det_order, k_ar_diff=k_ar_diff)
 
    # 迹检验结果
    print("=== Johansen 迹检验 ===")
    for i in range(len(result.trace_stat)):
        print(f"r <= {i}: 统计量={result.trace_stat[i]:.4f}, "
              f"5%临界值={result.cvt[i, 1]:.4f}, "
              f"{'拒绝 H0 (存在协整)' if result.trace_stat[i] > result.cvt[i, 1] else '不拒绝 H0'}")
 
    # 最大特征值检验结果
    print("\n=== Johansen 最大特征值检验 ===")
    for i in range(len(result.max_eig_stat)):
        print(f"r <= {i}: 统计量={result.max_eig_stat[i]:.4f}, "
              f"5%临界值={result.cvm[i, 1]:.4f}, "
              f"{'拒绝 H0' if result.max_eig_stat[i] > result.cvm[i, 1] else '不拒绝 H0'}")
 
    return result
 
# 测试:两只协整股票
data = np.column_stack([price_A, price_B])
johansen_test(data, det_order=0, k_ar_diff=1)
=== Johansen 迹检验 ===
r <= 0: 统计量=32.4521, 5%临界值=15.4943, 拒绝 H0 (存在协整)
r <= 1: 统计量=3.2105, 5%临界值=3.8415, 不拒绝 H0

=== Johansen 最大特征值检验 ===
r <= 0: 统计量=29.2416, 5%临界值=14.2646, 拒绝 H0
r <= 1: 统计量=3.2105, 5%临界值=3.8415, 不拒绝 H0

结果解读:r=0 被拒绝(至少存在 1 个协整关系),r=1 不被拒绝(不存在 2 个协整关系)。结论:两组序列之间存在 1 个协整关系


三、价差建模

3.1 简单价差

最常见的价差定义:

其中 就是对冲比率。 告诉你:做多 1 单位的 Y,需要做空 单位的 X 来对冲方向性风险。

3.2 对冲比率 的估计

方法一:OLS 静态估计

最简单,用全样本回归:

def ols_hedge_ratio(y, x):
    """用 OLS 估计静态对冲比率"""
    x_const = sm.add_constant(x)
    model = OLS(y, x_const).fit()
    return model.params[1]

问题:用全样本数据估计,隐含了未来信息(未来数据不应该用来估计当前的 beta)。

方法二:OLS 滚动窗口估计

用最近 N 天的数据估计,每天更新:

def rolling_hedge_ratio(y, x, window=60):
    """
    滚动窗口 OLS 估计对冲比率
 
    参数:
        y, x: 价格序列
        window: 滚动窗口大小
    返回:
        beta_series: 每个时点的对冲比率
    """
    beta_series = np.full(len(y), np.nan)
 
    for t in range(window, len(y)):
        y_window = y[t-window:t]
        x_window = x[t-window:t]
        beta = ols_hedge_ratio(y_window, x_window)
        beta_series[t] = beta
 
    return beta_series

方法三:Kalman Filter 估计时变 beta

这是最灵活的方法。Kalman Filter 在每个时点利用新信息递归更新 beta 的估计,并给出估计的不确定性。

def kalman_hedge_ratio(y, x, delta=0.0001, sigma_w=0.01, sigma_v=0.1):
    """
    Kalman Filter 估计时变对冲比率
 
    参数:
        y: 被解释变量(股票 Y 的价格)
        x: 解释变量(股票 X 的价格)
        delta: 状态转移矩阵中的衰减参数
        sigma_w: 状态噪声标准差
        sigma_v: 观测噪声标准差
 
    返回:
        beta_smooth: 平滑后的对冲比率序列
        beta_filtered: 滤波后的对冲比率序列
        Q: 对冲比率的估计方差序列
    """
    n = len(y)
    # 状态向量:[beta, alpha]
    # 观测方程: y_t = [x_t, 1] @ [beta_t, alpha_t]^T + v_t
    # 状态方程: [beta_t, alpha_t] = [1-delta, 0; 0, 1-delta] @ [beta_{t-1}, alpha_{t-1}] + w_t
 
    # 初始化
    x_mat = np.column_stack([x, np.ones(n)])  # 观测矩阵 (n, 2)
    state = np.array([0.0, 0.0])              # 初始状态 [beta, alpha]
    P = np.eye(2) * 100                        # 初始状态协方差
    F = np.array([[1 - delta, 0],              # 状态转移矩阵
                   [0, 1 - delta]])
    Q = np.array([[sigma_w**2, 0],             # 状态噪声协方差
                   [0, sigma_w**2]])
    R = sigma_v**2                              # 观测噪声方差
 
    beta_filtered = np.zeros(n)
    beta_smooth = np.zeros(n)
    Q_series = np.zeros(n)
 
    for t in range(n):
        H = x_mat[t:t+1, :]  # 当前观测矩阵 (1, 2)
 
        # 预测步骤
        state_pred = F @ state
        P_pred = F @ P @ F.T + Q
 
        # 更新步骤
        y_pred = H @ state_pred
        residual = y[t] - y_pred
        S = H @ P_pred @ H.T + R  # 预测残差方差
        K = P_pred @ H.T / S       # Kalman 增益
 
        state = state_pred + (K * residual).flatten()
        P = P_pred - K @ H @ P_pred
        P = (P + P.T) / 2  # 保证对称
 
        beta_filtered[t] = state[0]
        Q_series[t] = P[0, 0]
 
    # 简单的指数平滑
    for t in range(1, n):
        beta_smooth[t] = 0.9 * beta_smooth[t-1] + 0.1 * beta_filtered[t]
 
    return beta_smooth, beta_filtered, Q_series

3.3 为什么对冲比率不是固定不变的

在实际市场中,两只股票之间的关系会因为以下原因变化:

  • 公司基本面变化:业务结构调整、管理层变动
  • 行业环境变化:政策变化、技术革新
  • 市场情绪变化:某些阶段一只股票的”故事”更受追捧
  • 流动性变化:一只股票的流动性结构改变

用固定 beta 就像用一张旧地图导航——地图已经不是最新的了。


四、交易信号

4.1 Z-score 标准化

把价差转化为”当前偏离了几个标准差”:

其中 是价差的滚动均值和滚动标准差。

def zscore_signals(spread, entry_threshold=2.0, exit_threshold=0.5,
                   lookback=20):
    """
    基于 Z-score 的配对交易信号生成
 
    参数:
        spread: 价差序列
        entry_threshold: 入场阈值(绝对值)
        exit_threshold: 出场阈值(绝对值)
        lookback: 计算滚动均值/标准差的窗口
 
    返回:
        positions: 持仓状态序列 (1=做多价差, -1=做空价差, 0=空仓)
        z_scores: Z-score 序列
    """
    spread_series = pd.Series(spread)
    rolling_mean = spread_series.rolling(lookback).mean()
    rolling_std = spread_series.rolling(lookback).std()
 
    z_scores = (spread_series - rolling_mean) / rolling_std
 
    positions = pd.Series(0.0, index=spread_series.index)
 
    # 入场:Z-score 超过阈值
    # 价差太大 → 做空价差(做空 Y + 做多 beta*X)
    # 价差太小 → 做多价差(做多 Y + 做空 beta*X)
    long_entry = z_scores < -entry_threshold
    short_entry = z_scores > entry_threshold
 
    # 出场:Z-score 回归到阈值内
    long_exit = z_scores > -exit_threshold
    short_exit = z_scores < exit_threshold
 
    # 构建持仓
    current_pos = 0
    for t in range(len(spread_series)):
        if pd.isna(z_scores.iloc[t]):
            positions.iloc[t] = 0
            continue
 
        if current_pos == 0:
            if long_entry.iloc[t]:
                current_pos = 1  # 做多价差
            elif short_entry.iloc[t]:
                current_pos = -1  # 做空价差
        elif current_pos == 1:
            if long_exit.iloc[t]:
                current_pos = 0
        elif current_pos == -1:
            if short_exit.iloc[t]:
                current_pos = 0
 
        positions.iloc[t] = current_pos
 
    return positions, z_scores

4.2 半衰期计算

半衰期告诉你:价差偏离后,平均需要多长时间回归一半。

其中 是 OU 过程的回归速度参数。半衰期越短,回归越快,策略越容易获利。

def calculate_half_life(spread):
    """
    通过 OLS 回归估计价差的半衰期
 
    原理: 对 OU 过程离散化后,
    spread[t] - spread[t-1] = theta * (mu - spread[t-1]) * dt + noise
    简化为: spread[t] - spread[t-1] = -theta * spread[t-1] * dt + const + noise
    即: delta_spread = -theta * lagged_spread + const
 
    参数:
        spread: 价差序列
    返回:
        half_life: 半衰期(天)
        theta: 回归速度
    """
    spread_series = pd.Series(spread)
    lagged_spread = spread_series.shift(1)
    delta_spread = spread_series.diff()
 
    # 去掉 NaN
    valid = pd.DataFrame({'delta': delta_spread, 'lagged': lagged_spread}).dropna()
 
    if len(valid) < 10:
        return np.nan, np.nan
 
    # OLS 回归: delta = a + b * lagged
    X = sm.add_constant(valid['lagged'])
    model = OLS(valid['delta'], X).fit()
    b = model.params['lagged']
 
    # theta = -b / dt (dt=1 天)
    theta = -b
 
    if theta <= 0:
        return np.inf, theta  # 没有均值回归
 
    half_life = np.log(2) / theta
    return half_life, theta
 
# 示例
half_life, theta = calculate_half_life(spread)
print(f"回归速度 theta: {theta:.4f}")
print(f"半衰期: {half_life:.1f} 天")
回归速度 theta: 0.0523
半衰期: 13.3 天

半衰期的实际意义:

半衰期含义策略含义
< 5 天回归很快适合短周期交易
5-20 天回归适中需要耐心持有
> 30 天回归很慢可能不适合交易
无穷大没有回归关系可能已经断裂

五、配对筛选

在实际交易中,你需要从大量股票中筛选出”好的配对”。

5.1 距离法

计算两只股票的标准化价格序列之间的距离:

def distance_method(prices_dict, lookback=252):
    """
    距离法筛选配对
 
    参数:
        prices_dict: {股票名: 价格序列} 的字典
        lookback: 回看窗口
 
    返回:
        pairs: 排序后的配对列表 [(股票A, 股票B, 距离), ...]
    """
    stocks = list(prices_dict.keys())
    pairs = []
 
    for i in range(len(stocks)):
        for j in range(i + 1, len(stocks)):
            s1 = pd.Series(prices_dict[stocks[i]]).iloc[-lookback:]
            s2 = pd.Series(prices_dict[stocks[j]]).iloc[-lookback:]
 
            # 对齐索引
            s1, s2 = s1.align(s2, join='inner')
 
            if len(s1) < 100:
                continue
 
            # 标准化
            s1_norm = (s1 - s1.mean()) / s1.std()
            s2_norm = (s2 - s2.mean()) / s2.std()
 
            # 计算欧氏距离
            distance = np.sqrt(np.sum((s1_norm - s2_norm) ** 2))
            pairs.append((stocks[i], stocks[j], distance))
 
    pairs.sort(key=lambda x: x[2])  # 距离从小到大排序
    return pairs

5.2 协整法

用 Engle-Granger 检验筛选:

def cointegration_screening(prices_dict, p_threshold=0.05, min_obs=200):
    """
    协整法筛选配对
 
    参数:
        prices_dict: {股票名: 价格序列} 的字典
        p_threshold: ADF 检验的 p 值阈值
        min_obs: 最少观测数量
 
    返回:
        pairs: 协整配对列表 [(股票A, 股票B, p值, beta), ...]
    """
    stocks = list(prices_dict.keys())
    pairs = []
 
    for i in range(len(stocks)):
        for j in range(i + 1, len(stocks)):
            s1 = pd.Series(prices_dict[stocks[i]])
            s2 = pd.Series(prices_dict[stocks[j]])
 
            s1, s2 = s1.align(s2, join='inner')
 
            if len(s1) < min_obs:
                continue
 
            result = engle_granger_test(s1.values, s2.values)
 
            if result['is_cointegrated']:
                pairs.append((
                    stocks[i], stocks[j],
                    result['p_value'],
                    result['beta']
                ))
 
    # 按 p 值从小到大排序
    pairs.sort(key=lambda x: x[2])
    return pairs

5.3 稳定性检验

筛选出协整配对后,还需要验证协整关系的稳定性。一个在 5 年数据上显著但只在其中 1 年显著的配对,不如一个在 3 年中持续显著的配对。

def stability_test(y, x, window=252, step=63):
    """
    滚动窗口协整稳定性检验
 
    在多个子区间分别做协整检验,
    统计"通过检验的比例"
 
    参数:
        y, x: 价格序列
        window: 子区间窗口
        step: 滚动步长
 
    返回:
        pass_ratio: 通过检验的比例
        details: 每个子区间的检验结果
    """
    y_series = pd.Series(y)
    x_series = pd.Series(x)
    y_series, x_series = y_series.align(x_series, join='inner')
 
    details = []
    t = 0
    while t + window <= len(y_series):
        y_sub = y_series.iloc[t:t + window].values
        x_sub = x_series.iloc[t:t + window].values
 
        result = engle_granger_test(y_sub, x_sub)
        details.append({
            'start': y_series.index[t],
            'end': y_series.index[t + window - 1],
            'p_value': result['p_value'],
            'is_cointegrated': result['is_cointegrated']
        })
        t += step
 
    pass_count = sum(1 for d in details if d['is_cointegrated'])
    pass_ratio = pass_count / len(details) if details else 0
 
    return pass_ratio, details

六、完整配对交易系统

下面用一个完整的例子把所有环节串起来。

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import adfuller
from statsmodels.regression.linear_model import OLS
import statsmodels.api as sm
 
 
# ============================================================
# 第 1 步:模拟两只协整股票
# ============================================================
np.random.seed(42)
n = 1000
 
# 生成 OU 过程价差
theta = 0.05   # 回归速度
mu_spread = 0  # 价差均值
sigma_spread = 1.5  # 价差波动率
dt = 1
 
spread = np.zeros(n)
for t in range(1, n):
    spread[t] = (spread[t-1]
                 + theta * (mu_spread - spread[t-1]) * dt
                 + sigma_spread * np.random.normal() * np.sqrt(dt))
 
# 股票 X:随机游走
x_returns = np.random.normal(0.0005, 0.015, n)
price_X = 100 * np.cumprod(1 + x_returns)
 
# 股票 Y:X 的协整配对(Y = X + spread)
price_Y = price_X + spread
 
# 转成 pandas
dates = pd.date_range(start='2020-01-01', periods=n, freq='B')
df = pd.DataFrame({
    'X': price_X,
    'Y': price_Y,
    'spread': spread
}, index=dates)
 
 
# ============================================================
# 第 2 步:协整检验
# ============================================================
# Engle-Granger
result = engle_granger_test(df['Y'].values, df['X'].values)
print(f"=== 协整检验 ===")
print(f"对冲比率 beta: {result['beta']:.4f}")
print(f"ADF 统计量: {result['adf_stat']:.4f}")
print(f"p 值: {result['p_value']:.6f}")
print(f"是否协整: {result['is_cointegrated']}")
 
# 半衰期
half_life, theta_est = calculate_half_life(df['spread'].values)
print(f"\n=== 半衰期 ===")
print(f"估计的 theta: {theta_est:.4f}")
print(f"半衰期: {half_life:.1f} 天")
 
 
# ============================================================
# 第 3 步:信号生成
# ============================================================
entry_threshold = 2.0
exit_threshold = 0.5
lookback = 20
 
positions, z_scores = zscore_signals(
    df['spread'].values,
    entry_threshold=entry_threshold,
    exit_threshold=exit_threshold,
    lookback=lookback
)
 
 
# ============================================================
# 第 4 步:回测
# ============================================================
# 计算价差收益
spread_returns = df['spread'].diff()
 
# 交易成本(双边,基点)
cost_bps = 20  # 20 个基点 = 0.2%
 
# 计算策略收益
strategy_returns = pd.Series(0.0, index=df.index)
prev_pos = 0
for t in range(1, len(df)):
    current_pos = positions.iloc[t]
 
    # 持仓收益
    strategy_returns.iloc[t] = prev_pos * spread_returns.iloc[t]
 
    # 换仓成本
    if current_pos != prev_pos and prev_pos != 0:
        # 平仓 + 开仓 = 2 次交易
        cost = cost_bps / 10000 * 2
        strategy_returns.iloc[t] -= cost * abs(prev_pos)
 
    if current_pos != 0 and prev_pos == 0:
        # 开仓成本
        cost = cost_bps / 10000
        strategy_returns.iloc[t] -= cost * abs(current_pos)
 
    prev_pos = current_pos
 
# 累计收益
cumulative_returns = (1 + strategy_returns).cumprod()
 
 
# ============================================================
# 第 5 步:绩效评估
# ============================================================
total_return = cumulative_returns.iloc[-1] - 1
annual_return = (1 + total_return) ** (252 / n) - 1
daily_std = strategy_returns.std()
annual_std = daily_std * np.sqrt(252)
sharpe = annual_return / annual_std if annual_std > 0 else 0
 
# 最大回撤
peak = cumulative_returns.expanding().max()
drawdown = (cumulative_returns - peak) / peak
max_drawdown = drawdown.min()
 
# 交易统计
n_trades = (positions.diff().abs() > 0).sum() // 2  # 每次开平算一笔
winning_trades = 0
 
print(f"\n=== 绩效评估 ===")
print(f"总收益: {total_return:.2%}")
print(f"年化收益: {annual_return:.2%}")
print(f"年化波动率: {annual_std:.2%}")
print(f"Sharpe 比率: {sharpe:.2f}")
print(f"最大回撤: {max_drawdown:.2%}")
print(f"交易次数: {n_trades}")
 
 
# ============================================================
# 第 6 步:可视化
# ============================================================
fig, axes = plt.subplots(4, 1, figsize=(14, 14), sharex=True)
 
# 价格
axes[0].plot(df.index, df['X'], label='股票 X', alpha=0.8)
axes[0].plot(df.index, df['Y'], label='股票 Y', alpha=0.8)
axes[0].set_title('股票 X 和 Y 的价格')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
 
# 价差和 Z-score
ax1 = axes[1]
ax2 = ax1.twinx()
ax1.plot(df.index, df['spread'], color='blue', alpha=0.6, label='价差')
ax1.axhline(y=df['spread'].mean(), color='gray', linestyle='--', alpha=0.5)
ax2.plot(df.index, z_scores, color='red', alpha=0.6, label='Z-score')
ax2.axhline(y=entry_threshold, color='red', linestyle=':', alpha=0.5)
ax2.axhline(y=-entry_threshold, color='red', linestyle=':', alpha=0.5)
ax2.axhline(y=exit_threshold, color='green', linestyle=':', alpha=0.5)
ax2.axhline(y=-exit_threshold, color='green', linestyle=':', alpha=0.5)
ax1.set_title('价差与 Z-score')
ax1.legend(loc='upper left')
ax2.legend(loc='upper right')
ax1.grid(True, alpha=0.3)
 
# 持仓
axes[2].fill_between(df.index, 0, positions, color='orange', alpha=0.5, step='post')
axes[2].set_title('持仓状态 (1=做多价差, -1=做空价差)')
axes[2].grid(True, alpha=0.3)
 
# 累计收益
axes[3].plot(cumulative_returns.index, cumulative_returns, color='green')
axes[3].fill_between(drawdown.index, 1, cumulative_returns / peak,
                     where=(drawdown < 0), color='red', alpha=0.3)
axes[3].set_title('策略累计收益')
axes[3].grid(True, alpha=0.3)
 
plt.tight_layout()
plt.show()

七、常见失败模式

配对交易在实际中容易踩的坑:

失败模式症状原因
协整关系断裂策略突然持续亏损基本面变化导致关系不再成立
假配对样本内好,样本外差数据挖掘/过拟合
回归太慢半衰期过长,成本吞噬价差回归速度不够快
对冲比率漂移用固定 beta 但实际关系在变没有及时更新对冲比率
相关性陷阱用相关性筛选配对相关 ≠ 协整
事件冲击个股事件导致价差急剧偏离配对中一只股票有特异事件

最重要的经验:配对交易不是”找到两个相关股票然后等着赚钱”。它需要持续监控协整关系的稳定性,及时止损,并且交易成本之后仍然要有正期望。


小结

概念要点
相关性 vs 协整相关性 = 一起涨跌;协整 = 距离稳定。交易需要后者
Engle-Granger回归 → 检验残差平稳性。简单但不对称
Johansen对称的多元协整检验。更严格
对冲比率OLS 静态 → 滚动 OLS → Kalman Filter(推荐后者)
Z-score 信号入场 ±2,出场 ±0.5(可调)
半衰期衡量回归速度,决定策略是否可行
配对筛选距离法 + 协整法 + 稳定性检验

→ 下一章:02-均值回归模型 — 配对交易背后的数学基础