统计套利详解

最经典的量化策略:从数学原理到完整代码实现

学习目标

  • 理解统计套利和配对交易的核心原理
  • 掌握协整检验的数学原理和 Python 实现
  • 学会构建完整的配对交易系统
  • 了解统计套利的风险和 ML 增强方法

一、配对交易原理

1.1 统计套利的核心思想

统计套利(Statistical Arbitrage)基于一个基本观察:某些资产之间存在长期稳定的关系,当这种关系被打破时,就存在交易机会。

┌─────────────────────────────────────────────────────────────┐
│                    统计套利的核心逻辑                        │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  正常状态:股票A和股票B价格关系稳定                         │
│                                                             │
│        A ───────────────────────  B                         │
│           ↖                    ↗                            │
│             ↖                ↗                              │
│               价差稳定在一定范围                              │
│                                                             │
│  异常状态:价差异常扩大                                      │
│                                                             │
│        A ─────────────────      B ←─────  套利机会           │
│           ↖                      ↗                          │
│             ↖  价差异常扩大    ↗                             │
│                                                             │
│  交易动作:                                                  │
│  • 做空"相对昂贵"的资产                                     │
│  • 做多"相对便宜"的资产                                     │
│  • 等待价差回归平仓获利                                     │
│                                                             │
└─────────────────────────────────────────────────────────────┘

1.2 什么是价差的平稳性

**平稳性(Stationarity)**是统计套利的数学基础。

非平稳序列(如股票价格):

  • 均值随时间变化
  • 方差随时间变化
  • 有趋势,不回归任何固定值

平稳序列(如价差):

  • 均值恒定
  • 方差恒定
  • 围绕均值波动,有回归倾向
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
 
# 对比非平稳和平稳序列
np.random.seed(42)
n = 500
 
# 非平稳:随机游走(股票价格)
non_stationary = pd.Series(
    np.cumsum(np.random.normal(0, 1, n)),
    index=pd.date_range('2023-01-01', periods=n, freq='D'),
    name='Stock Price'
)
 
# 平稳:Ornstein-Uhlenbeck 过程(价差)
stationary = []
x = 0
theta = 0.1  # 回归速度
mu = 0       # 均值
sigma = 1    # 波动率
 
for _ in range(n):
    dx = theta * (mu - x) + sigma * np.random.normal(0, 1)
    x += dx
    stationary.append(x)
 
stationary = pd.Series(
    stationary,
    index=pd.date_range('2023-01-01', periods=n, freq='D'),
    name='Spread'
)
 
# 可视化对比
fig, axes = plt.subplots(2, 1, figsize=(14, 8))
 
axes[0].plot(non_stationary.index, non_stationary.values)
axes[0].set_title('非平稳序列:股票价格(随机游走)')
axes[0].set_ylabel('Price')
axes[0].grid(True, alpha=0.3)
axes[0].text(non_stationary.index[50], non_stationary.max() * 0.9,
              '• 无固定均值\n• 有趋势\n• 不回归', fontsize=10,
              bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
 
axes[1].plot(stationary.index, stationary.values)
axes[1].axhline(y=0, color='red', linestyle='--', alpha=0.7, label='Mean')
axes[1].fill_between(stationary.index, -2, 2, alpha=0.1, color='green')
axes[1].set_title('平稳序列:价差(均值回归)')
axes[1].set_ylabel('Spread')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
axes[1].text(stationary.index[50], stationary.max() * 0.8,
              '• 固定均值 (0)\n• 围绕均值波动\n• 有回归倾向', fontsize=10,
              bbox=dict(boxstyle='round', facecolor='lightblue', alpha=0.5))
 
plt.tight_layout()
plt.show()

1.3 Ornstein-Uhlenbeck 过程

价差的行为可以用 OU 过程描述:

其中:

  • = 价差在时间 t 的值
  • = 价差的长期均值
  • = 回归速度(越大回归越快)
  • = 波动率
  • = 维纳过程增量

OU 过程的离散化形式

其中

def simulate_ou_process(n_periods, theta=0.1, mu=0, sigma=1, x0=0, dt=1):
    """
    模拟 Ornstein-Uhlenbeck 过程
 
    参数:
        n_periods: 模拟期数
        theta: 回归速度 (0-1,越大回归越快)
        mu: 长期均值
        sigma: 扩散系数(波动率)
        x0: 初始值
        dt: 时间步长
 
    返回:
        ou_series: OU 过程序列
    """
    x = x0
    ou_values = []
 
    for _ in range(n_periods):
        dx = theta * (mu - x) * dt + sigma * np.sqrt(dt) * np.random.normal(0, 1)
        x += dx
        ou_values.append(x)
 
    return pd.Series(ou_values)
 
 
# 比较不同回归速度
fig, axes = plt.subplots(2, 2, figsize=(14, 8))
 
thetas = [0.01, 0.05, 0.2, 0.5]
titles = ['弱回归 (θ=0.01)', '中等回归 (θ=0.05)', '强回归 (θ=0.2)', '很强回归 (θ=0.5)']
 
for i, (ax, theta, title) in enumerate(zip(axes.flat, thetas, titles)):
    np.random.seed(42 + i)
    ou = simulate_ou_process(500, theta=theta, mu=0, sigma=2)
    ax.plot(ou.index, ou.values, alpha=0.7)
    ax.axhline(y=0, color='red', linestyle='--', alpha=0.5)
 
    # 计算半衰期
    half_life = np.log(2) / theta if theta > 0 else float('inf')
 
    ax.set_title(f'{title}\n半衰期 ≈ {half_life:.1f} 期')
    ax.grid(True, alpha=0.3)
 
plt.suptitle('不同回归速度的 OU 过程', fontsize=14)
plt.tight_layout()
plt.show()

半衰期(Half-Life):价差偏离均值后,回归到一半偏离度所需时间:

def calculate_half_life(spread):
    """
    计算价差的半衰期
 
    通过回归估计: spread_{t+1} - spread_t = theta * (mu - spread_t) + epsilon
    """
    # 准备数据
    y = spread.diff().dropna()
    x = spread.shift(1).dropna()
 
    # 对齐
    y = y[y.index.isin(x.index)]
    x = x[x.index.isin(y.index)]
 
    # 线性回归
    from scipy import stats
    slope, intercept, r_value, p_value, std_err = stats.linregress(x, y)
 
    # theta = -slope
    theta = -slope
 
    if theta > 0:
        half_life = np.log(2) / theta
        return half_life, theta
    else:
        return float('inf'), theta

二、协整检验 (Cointegration Test)

协整是统计套利的数学基础。协整 ≠ 相关性,这是初学者最容易混淆的概念。

2.1 协整 vs 相关性

维度协整 (Cointegration)相关性 (Correlation)
定义长期均衡关系线性关系强度
关注点价差的平稳性变动方向一致性
序列属性要求序列是非平稳的无要求
应用配对交易、统计套利投资组合分散
典型例子股票价格与期货价格同行业股票走势

关键区别

相关性高 ≠ 协整关系
- 两只股票可能高度相关(同涨同跌)
- 但价差可能持续扩大(没有协整关系)
- 这种情况下做配对交易会亏损

协整关系强 ≠ 相关性高
- 理论上协整的资产可能有短期低相关
- 但长期价差会回归

2.2 数学定义

两个时间序列 协整,如果:

  1. 都是 序列(一阶差分平稳)
  2. 存在线性组合 序列(平稳)

直觉理解:虽然两个序列各自都在”漫游”,但它们之间保持某种关系,不会无限偏离。

2.3 Engle-Granger 两步法

最常用的协整检验方法。

步骤一:回归求长期关系

步骤二:检验残差平稳性

如果 平稳,则 协整。

2.3.1 完整 Python 实现

import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import adfuller
from scipy import stats
import matplotlib.pyplot as plt
 
def engle_granger_cointegration_test(series1, series2, significance_level=0.05):
    """
    Engle-Granger 两步法协整检验
 
    参数:
        series1, series2: 两个价格序列
        significance_level: 显著性水平
 
    返回:
        results: 包含检验结果的字典
    """
    # 确保对齐
    df = pd.DataFrame({'y': series1, 'x': series2}).dropna()
    y = df['y'].values
    x = df['x'].values
 
    results = {
        'series1_name': series1.name if hasattr(series1, 'name') else 'Series1',
        'series2_name': series2.name if hasattr(series2, 'name') else 'Series2',
        'is_cointegrated': False,
        'hedge_ratio': None,
        'p_value': None,
        'critical_values': None,
        'spread': None,
        'test_statistic': None
    }
 
    # ===== 步骤一:长期关系回归 =====
    # 添加常数项
    x_with_const = np.column_stack([np.ones(len(x)), x])
 
    # OLS 回归
    beta_hat = np.linalg.lstsq(x_with_const, y, rcond=None)[0]
    alpha, beta = beta_hat[0], beta_hat[1]
 
    # 计算残差(价差)
    spread = y - alpha - beta * x
 
    # ===== 步骤二:ADF 检验残差平稳性 =====
    # 注意:ADF 分布不标准,需要用 MacKinnon 临界值
    adf_result = adfuller(spread, maxlag=1, regression='c')
 
    results['test_statistic'] = adf_result[0]
    results['p_value'] = adf_result[1]
    results['critical_values'] = adf_result[4]
    results['hedge_ratio'] = beta
    results['spread'] = pd.Series(spread, index=df.index)
    results['alpha'] = alpha
 
    # 判断是否协整
    # 注:实际应使用 Engle-Granger 临界值,这里简化使用 ADF 临界值
    if results['p_value'] < significance_level:
        results['is_cointegrated'] = True
 
    return results
 
 
def plot_cointegration_analysis(series1, series2, results):
    """可视化协整分析结果"""
    fig, axes = plt.subplots(2, 2, figsize=(14, 10))
 
    # 1. 原始价格
    axes[0, 0].plot(series1.index, series1.values, label=results['series1_name'], alpha=0.7)
    axes[0, 0].plot(series2.index, series2.values, label=results['series2_name'], alpha=0.7)
    axes[0, 0].set_title('原始价格序列')
    axes[0, 0].legend()
    axes[0, 0].grid(True, alpha=0.3)
 
    # 2. 协整关系
    normalized_series2 = results['alpha'] + results['hedge_ratio'] * series2
    axes[0, 1].plot(series1.index, series1.values, label=results['series1_name'], alpha=0.7)
    axes[0, 1].plot(series2.index, normalized_series2.values,
                    label=f"{results['series2_name']} (adjusted)", alpha=0.7)
    axes[0, 1].set_title(f'协整关系 (对冲比率 = {results["hedge_ratio"]:.3f})')
    axes[0, 1].legend()
    axes[0, 1].grid(True, alpha=0.3)
 
    # 3. 价差
    spread = results['spread']
    spread_mean = spread.mean()
    spread_std = spread.std()
 
    axes[1, 0].plot(spread.index, spread.values, label='Spread', color='steelblue')
    axes[1, 0].axhline(y=spread_mean, color='red', linestyle='--', label='Mean', alpha=0.7)
    axes[1, 0].axhline(y=spread_mean + 2*spread_std, color='green', linestyle=':', label='±2σ', alpha=0.5)
    axes[1, 0].axhline(y=spread_mean - 2*spread_std, color='green', linestyle=':', alpha=0.5)
    axes[1, 0].fill_between(spread.index, spread_mean - 2*spread_std, spread_mean + 2*spread_std, alpha=0.1)
    axes[1, 0].set_title('价差(残差)')
    axes[1, 0].legend()
    axes[1, 0].grid(True, alpha=0.3)
 
    # 4. ADF 检验结果
    axes[1, 1].axis('off')
    test_text = f"""
    ADF 检验结果:
 
    检验统计量: {results['test_statistic']:.4f}
    p 值: {results['p_value']:.4f}
 
    临界值:
    • 1%: {results['critical_values']['1%']:.4f}
    • 5%: {results['critical_values']['5%']:.4f}
    • 10%: {results['critical_values']['10%']:.4f}
 
    结论: {'✅ 协整关系存在' if results['is_cointegrated'] else '❌ 无协整关系'}
 
    对冲比率 (β): {results['hedge_ratio']:.4f}
    """
    axes[1, 1].text(0.1, 0.5, test_text, fontsize=12, verticalalignment='center',
                    bbox=dict(boxstyle='round', facecolor='wheat' if results['is_cointegrated'] else 'lightcoral', alpha=0.5))
 
    plt.suptitle('协整分析报告', fontsize=14)
    plt.tight_layout()
    plt.show()
 
    return fig
 
 
# ============ 模拟协整数据演示 ============
np.random.seed(42)
n_periods = 500
 
# 生成协整数据
# y_t = α + β * x_t + ε_t,其中 ε_t 服从平稳过程
 
# x: 随机游走
x = np.cumsum(np.random.normal(0, 1, n_periods))
 
# y: 与 x 协整
beta_true = 1.5
alpha_true = 10
noise = np.random.normal(0, 2, n_periods)
y = alpha_true + beta_true * x + noise
 
# 转换为 Series
dates = pd.date_range('2023-01-01', periods=n_periods, freq='D')
series_x = pd.Series(x, index=dates, name='Stock_X')
series_y = pd.Series(y, index=dates, name='Stock_Y')
 
# 运行协整检验
results = engle_granger_cointegration_test(series_y, series_x)
print(f"协整检验结果: {'通过' if results['is_cointegrated'] else '未通过'}")
print(f"估计对冲比率: {results['hedge_ratio']:.4f} (真实值: {beta_true})")
print(f"ADF p-value: {results['p_value']:.6f}")
 
# 可视化
plot_cointegration_analysis(series_y, series_x, results)
 
 
# ============ 对比:非协整数据 ============
# 生成两个独立的随机游走
np.random.seed(123)
x_non = np.cumsum(np.random.normal(0, 1, n_periods))
y_non = np.cumsum(np.random.normal(0, 1, n_periods)) + 100
 
series_x_non = pd.Series(x_non, index=dates, name='Stock_X_Non')
series_y_non = pd.Series(y_non, index=dates, name='Stock_Y_Non')
 
# 检验
results_non = engle_granger_cointegration_test(series_y_non, series_x_non)
print(f"\n非协整检验结果: {'通过' if results_non['is_cointegrated'] else '未通过'}")
print(f"ADF p-value: {results_non['p_value']:.6f}")

2.4 Johansen 检验(多变量协整)

当涉及多个资产时,使用 Johansen 检验更合适。

from statsmodels.tsa.vector_ar.vecm import coint_johansen
 
def johansen_cointegration_test(prices_df, det_order=0, k_ar_diff=1):
    """
    Johansen 协整检验(多变量)
 
    参数:
        prices_df: 价格 DataFrame (每列一个资产)
        det_order: 确定性趋势阶数 (0=无, 1=常数, 2=常数+趋势)
        k_ar_diff: VAR 滞后阶数
 
    返回:
        results: 检验结果
    """
    # 准备数据
    data = prices_df.values
 
    # Johansen 检验
    result = coint_johansen(data, det_order, k_ar_diff)
 
    results = {
        'eig': result.eig,  # 特征值
        'r0t': result.r0t,  # 迹统计量
        'r1t': result.r1t,  # 最大特征值统计量
        'cvt': result.cvt,  # 迹统计量临界值 (90%, 95%, 99%)
        'cvm': result.cvm,  # 最大特征值临界值
        'evec': result.evec  # 特征向量(协整向量)
    }
 
    # 解释结果
    r_trace = []
    for i, (stat, crit) in enumerate(zip(results['r0t'], results['cvt'][:, 1])):  # 使用 95% 临界值
        r_trace.append({
            'rank': i,
            'trace_statistic': stat,
            'critical_value_95': crit,
            'reject': stat > crit
        })
 
    results['trace_test'] = r_trace
    results['cointegration_rank'] = sum([r['reject'] for r in r_trace]) - 1
    results['cointegration_rank'] = max(0, results['cointegration_rank'])
 
    return results
 
 
# 演示 Johansen 检验
np.random.seed(42)
n_periods = 500
 
# 生成三个协整的序列
# 共同因子
common_factor = np.cumsum(np.random.normal(0, 1, n_periods))
 
# 三个序列受共同因子驱动
z1 = 100 + 1.0 * common_factor + np.random.normal(0, 2, n_periods)
z2 = 80 + 1.2 * common_factor + np.random.normal(0, 1.5, n_periods)
z3 = 120 + 0.8 * common_factor + np.random.normal(0, 2.5, n_periods)
 
dates = pd.date_range('2023-01-01', periods=n_periods, freq='D')
prices_3 = pd.DataFrame({
    'Stock_A': z1,
    'Stock_B': z2,
    'Stock_C': z3
}, index=dates)
 
# Johansen 检验
results_johansen = johansen_cointegration_test(prices_3)
 
print("Johansen 协整检验结果:")
print(f"估计的协整秩: {results_johansen['cointegration_rank']}")
print("\n迹统计量检验:")
for r in results_johansen['trace_test']:
    print(f"  H0: 协整秩 = {r['rank']}")
    print(f"    迹统计量: {r['trace_statistic']:.2f}")
    print(f"    95% 临界值: {r['critical_value_95']:.2f}")
    print(f"    结论: {'拒绝 H0' if r['reject'] else '无法拒绝 H0'}")
 
# 协整向量(标准化第一个为1)
print("\n协整向量:")
for i, vec in enumerate(results_johansen['evec'].T):
    vec_norm = vec / vec[0]
    print(f"  向量 {i+1}: {vec_norm}")

三、配对选择方法

找到好的配对是成功的关键。

3.1 距离法(Distance Method)

逻辑:历史上价格走势接近的股票,可能是好的配对。

def distance_method_pairing(prices_df, lookback=252, top_n=10):
    """
    距离法选择配对
 
    度量:价格序列的欧氏距离(或 SSD:平方和距离)
 
    参数:
        prices_df: 价格 DataFrame
        lookback: 回看期
        top_n: 返回前 N 个配对
 
    返回:
        pairs: 配对列表 [(stock1, stock2, distance), ...]
    """
    # 使用最近 lookback 期的数据
    recent_prices = prices_df.tail(lookback)
 
    # 标准化价格(初始价格都为1)
    normalized = recent_prices / recent_prices.iloc[0]
 
    # 计算所有配对的距离
    n_stocks = len(prices_df.columns)
    distances = []
 
    for i in range(n_stocks):
        for j in range(i + 1, n_stocks):
            stock1 = prices_df.columns[i]
            stock2 = prices_df.columns[j]
 
            # SSD 距离
            diff = normalized[stock1] - normalized[stock2]
            ssd = np.sum(diff ** 2)
 
            distances.append((stock1, stock2, ssd))
 
    # 排序并返回 top_n
    distances.sort(key=lambda x: x[2])
    return distances[:top_n]
 
 
# 演示距离法
np.random.seed(42)
n_stocks = 20
n_periods = 500
 
# 生成一些相似的股票对
prices_dict = {}
for i in range(n_stocks):
    if i % 3 == 0:
        # 创建配对
        base = np.cumsum(np.random.normal(0.05, 1, n_periods))
        prices_dict[f'Stock_{i}'] = 100 + base + np.random.normal(0, 1, n_periods)
        prices_dict[f'Stock_{i+1}'] = 100 + base * 0.95 + np.random.normal(0, 1, n_periods)
    else:
        prices_dict[f'Stock_{i}'] = 100 + np.cumsum(np.random.normal(0, 1, n_periods))
 
prices_demo = pd.DataFrame(prices_dict)
 
# 使用距离法
pairs = distance_method_pairing(prices_demo, lookback=252, top_n=5)
 
print("距离法选择的配对:")
for i, (s1, s2, dist) in enumerate(pairs, 1):
    print(f"{i}. {s1} - {s2}: 距离 = {dist:.2f}")

3.2 协整法

逻辑:直接检验协整关系,选择协整性强的配对。

def cointegration_method_pairing(prices_df, significance_level=0.05, top_n=10):
    """
    协整法选择配对
 
    对所有可能配对进行协整检验,选择 p 值最小的
 
    参数:
        prices_df: 价格 DataFrame
        significance_level: 显著性水平
        top_n: 返回前 N 个配对
 
    返回:
        pairs: 配对列表 [(stock1, stock2, hedge_ratio, p_value), ...]
    """
    n_stocks = len(prices_df.columns)
    results = []
 
    for i in range(n_stocks):
        for j in range(i + 1, n_stocks):
            stock1 = prices_df.columns[i]
            stock2 = prices_df.columns[j]
 
            # 协整检验
            test_result = engle_granger_cointegration_test(
                prices_df[stock1],
                prices_df[stock2],
                significance_level=significance_level
            )
 
            if test_result['is_cointegrated']:
                results.append((
                    stock1,
                    stock2,
                    test_result['hedge_ratio'],
                    test_result['p_value']
                ))
 
    # 按 p 值排序
    results.sort(key=lambda x: x[3])
    return results[:top_n]
 
 
# 演示
pairs_coint = cointegration_method_pairing(prices_demo, significance_level=0.05, top_n=5)
 
print("\n协整法选择的配对:")
for i, (s1, s2, hedge, pval) in enumerate(pairs_coint, 1):
    print(f"{i}. {s1} - {s2}: 对冲比 = {hedge:.3f}, p 值 = {pval:.4f}")

3.3 机器学习法(聚类)

逻辑:使用聚类算法将相似的股票分组,然后在组内选择配对。

from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
 
def clustering_based_pairing(prices_df, n_clusters=5, top_per_cluster=2):
    """
    基于聚类的配对选择
 
    步骤:
    1. 提取特征(收益率、波动率等)
    2. 聚类
    3. 在同一聚类内寻找配对
 
    参数:
        prices_df: 价格 DataFrame
        n_clusters: 聚类数量
        top_per_cluster: 每个聚类返回的配对数
 
    返回:
        pairs: 配对列表
    """
    # 特征工程
    returns = prices_df.pct_change()
    features = pd.DataFrame(index=prices_df.columns)
 
    # 各类特征
    features['mean_return'] = returns.mean()
    features['volatility'] = returns.std()
    features['skewness'] = returns.skew()
    features['kurtosis'] = returns.kurt()
 
    # 添加一些技术指标特征
    for col in prices_df.columns:
        price = prices_df[col]
        features.loc[col, 'trend'] = (price.iloc[-1] / price.iloc[0] - 1)
 
    # 标准化
    scaler = StandardScaler()
    features_scaled = scaler.fit_transform(features)
 
    # 聚类
    kmeans = KMeans(n_clusters=n_clusters, random_state=42)
    clusters = kmeans.fit_predict(features_scaled)
 
    features['cluster'] = clusters
 
    # 在每个聚类内找配对
    pairs = []
 
    for cluster_id in range(n_clusters):
        cluster_stocks = features[features['cluster'] == cluster_id].index.tolist()
 
        if len(cluster_stocks) >= 2:
            # 在聚类内计算距离
            for i in range(len(cluster_stocks)):
                for j in range(i + 1, len(cluster_stocks)):
                    s1, s2 = cluster_stocks[i], cluster_stocks[j]
 
                    # 计算特征距离
                    dist = np.linalg.norm(
                        features_scaled[features.index.get_loc(s1)] -
                        features_scaled[features.index.get_loc(s2)]
                    )
 
                    pairs.append((s1, s2, cluster_id, dist))
 
    # 按距离排序
    pairs.sort(key=lambda x: x[3])
 
    return pairs[:top_per_cluster * n_clusters]
 
 
# 可视化聚类
def visualize_clustering(prices_df, n_clusters=5):
    """可视化股票聚类结果"""
    # 特征
    returns = prices_df.pct_change()
    features = pd.DataFrame(index=prices_df.columns)
    features['mean_return'] = returns.mean()
    features['volatility'] = returns.std()
 
    # 聚类
    scaler = StandardScaler()
    features_scaled = scaler.fit_transform(features)
    kmeans = KMeans(n_clusters=n_clusters, random_state=42)
    clusters = kmeans.fit_predict(features_scaled)
 
    # PCA 降维用于可视化
    pca = PCA(n_components=2)
    features_2d = pca.fit_transform(features_scaled)
 
    # 绘图
    plt.figure(figsize=(10, 8))
    scatter = plt.scatter(features_2d[:, 0], features_2d[:, 1],
                          c=clusters, cmap='tab10', s=100, alpha=0.7)
 
    # 添加股票标签
    for i, stock in enumerate(prices_df.columns):
        plt.annotate(stock, (features_2d[i, 0], features_2d[i, 1]),
                    fontsize=8, alpha=0.8)
 
    plt.colorbar(scatter, label='Cluster')
    plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.1%} variance)')
    plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.1%} variance)')
    plt.title('股票聚类(基于收益-波动特征)')
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()
 
 
# 演示聚类法
pairs_cluster = clustering_based_pairing(prices_demo, n_clusters=5, top_per_cluster=2)
 
print("\n聚类法选择的配对:")
for i, (s1, s2, cluster_id, dist) in enumerate(pairs_cluster[:10], 1):
    print(f"{i}. {s1} - {s2}: 聚类 {cluster_id}, 特征距离 = {dist:.3f}")
 
visualize_clustering(prices_demo, n_clusters=5)

四、交易信号生成

找到配对后,需要定义交易规则。

4.1 Z-score 入场/出场

def zscore_trading_signals(spread, entry_threshold=2.0, exit_threshold=0.5,
                            stop_loss_threshold=4.0, lookback=20):
    """
    基于 Z-score 的交易信号生成
 
    参数:
        spread: 价差序列
        entry_threshold: 入场 Z-score 阈值(如 ±2)
        exit_threshold: 出场 Z-score 阈值(如 ±0.5)
        stop_loss_threshold: 止损阈值(如 ±4)
        lookback: 均值和标准差的滚动窗口
 
    返回:
        signals: 信号 DataFrame (long_spread, short_spread, position)
    """
    # 计算滚动统计量
    spread_mean = spread.rolling(window=lookback).mean()
    spread_std = spread.rolling(window=lookback).std()
 
    # Z-score
    z_score = (spread - spread_mean) / spread_std
 
    # 初始化信号
    signals = pd.DataFrame(index=spread.index)
    signals['z_score'] = z_score
    signals['spread'] = spread
    signals['position'] = 0  # 1=做多价差, -1=做空价差, 0=空仓
 
    current_position = 0
 
    for i in range(lookback, len(spread)):
        z = z_score.iloc[i]
 
        if current_position == 0:  # 空仓状态
            # 价差异常低 → 做多价差(做多被低估的,做空被高估的)
            if z < -entry_threshold:
                current_position = 1
            # 价差异常高 → 做空价差
            elif z > entry_threshold:
                current_position = -1
 
        elif current_position == 1:  # 持有多头价差
            # 回归到均值 → 平仓
            if z > -exit_threshold:
                current_position = 0
            # 止损
            elif z < -stop_loss_threshold:
                current_position = 0
 
        elif current_position == -1:  # 持有空头价差
            # 回归到均值 → 平仓
            if z < exit_threshold:
                current_position = 0
            # 止损
            elif z > stop_loss_threshold:
                current_position = 0
 
        signals.iloc[i, signals.columns.get_loc('position')] = current_position
 
    return signals
 
 
# 演示信号生成
np.random.seed(42)
# 使用之前协整的价差
spread_series = results['spread']
 
# 生成信号
signals = zscore_trading_signals(spread_series, entry_threshold=2.0,
                                  exit_threshold=0.5, stop_loss_threshold=4.0)
 
# 可视化
fig, axes = plt.subplots(2, 1, figsize=(14, 8))
 
# 价差和 Z-score
ax1 = axes[0]
ax1.plot(signals.index, signals['spread'].values, label='Spread', color='steelblue', alpha=0.7)
ax1.set_ylabel('Spread', color='steelblue')
ax1.tick_params(axis='y', labelcolor='steelblue')
 
ax1_twin = ax1.twinx()
ax1_twin.plot(signals.index, signals['z_score'].values, label='Z-Score', color='orange', alpha=0.7)
ax1_twin.axhline(y=2, color='red', linestyle='--', alpha=0.5, label='Entry (±2)')
ax1_twin.axhline(y=-2, color='green', linestyle='--', alpha=0.5)
ax1_twin.axhline(y=0.5, color='gray', linestyle=':', alpha=0.5, label='Exit (±0.5)')
ax1_twin.axhline(y=-0.5, color='gray', linestyle=':', alpha=0.5)
ax1_twin.set_ylabel('Z-Score', color='orange')
ax1_twin.tick_params(axis='y', labelcolor='orange')
 
# 标记交易
long_entries = signals[(signals['position'].diff() == 1) | (signals['position'] == 1)]
short_entries = signals[(signals['position'].diff() == -1) | (signals['position'] == -1)]
exits = signals[signals['position'] == 0]
 
axes[0].set_title('价差和 Z-Score')
 
# 持仓
axes[1].plot(signals.index, signals['position'].values, drawstyle='steps-post',
             label='Position', linewidth=2)
axes[1].fill_between(signals.index, 0, signals['position'].values,
                     where=signals['position'].values > 0, alpha=0.3, color='green', label='Long Spread')
axes[1].fill_between(signals.index, 0, signals['position'].values,
                     where=signals['position'].values < 0, alpha=0.3, color='red', label='Short Spread')
axes[1].axhline(y=0, color='black', linestyle='-', alpha=0.3)
axes[1].set_ylabel('Position')
axes[1].set_xlabel('Date')
axes[1].set_title('交易信号')
axes[1].legend(loc='upper left')
axes[1].grid(True, alpha=0.3)
axes[1].set_ylim(-1.5, 1.5)
 
plt.tight_layout()
plt.show()

4.2 Bollinger Band 变体

def bollinger_band_signals(spread, lookback=20, num_std=2.0):
    """
    Bollinger Band 信号生成
 
    参数:
        spread: 价差序列
        lookback: 均线和标准差窗口
        num_std: 标准差倍数
 
    返回:
        signals: 信号 DataFrame
    """
    # 计算 BB
    middle = spread.rolling(window=lookback).mean()
    std = spread.rolling(window=lookback).std()
    upper = middle + num_std * std
    lower = middle - num_std * std
 
    # 生成信号
    signals = pd.DataFrame(index=spread.index)
    signals['spread'] = spread
    signals['upper'] = upper
    signals['lower'] = lower
    signals['middle'] = middle
 
    # 信号:触及上轨做空价差,触及下轨做多价差
    signals['position'] = 0
    signals.loc[spread > upper, 'position'] = -1
    signals.loc[spread < lower, 'position'] = 1
 
    # 平仓:回归到中轨附近
    signals.loc[spread.between(middle * 0.98, middle * 1.02), 'position'] = 0
 
    return signals

五、完整配对交易系统

将所有组件整合在一起。

import numpy as np
import pandas as pd
from scipy import stats
from statsmodels.tsa.stattools import adfuller
import matplotlib.pyplot as plt
 
class PairsTradingSystem:
    """
    完整的配对交易系统
 
    功能:
    1. 数据模拟/加载
    2. 协整检验
    3. 信号生成
    4. 回测
    5. 绩效评估
    """
 
    def __init__(self, stock1_name='Stock_A', stock2_name='Stock_B'):
        self.stock1_name = stock1_name
        self.stock2_name = stock2_name
        self.prices1 = None
        self.prices2 = None
        self.hedge_ratio = None
        self.spread = None
        self.signals = None
        self.returns = None
        self.performance = {}
 
    def generate_cointegrated_data(self, n_periods=500, beta=1.5, alpha=10,
                                    noise_std=2, seed=42):
        """
        生成模拟的协整数据
 
        参数:
            n_periods: 数据期数
            beta: 真实对冲比率
            alpha: 截距
            noise_std: 价差噪声标准差
            seed: 随机种子
        """
        np.random.seed(seed)
 
        # 生成随机游走 x
        x = np.cumsum(np.random.normal(0, 1, n_periods))
        x = pd.Series(x, name=self.stock2_name)
 
        # 生成与 x 协整的 y
        spread_noise = np.random.normal(0, noise_std, n_periods)
        y = alpha + beta * x + spread_noise
        y = pd.Series(y, name=self.stock1_name)
 
        dates = pd.date_range('2023-01-01', periods=n_periods, freq='D')
        self.prices1 = pd.Series(y.values, index=dates, name=self.stock1_name)
        self.prices2 = pd.Series(x.values, index=dates, name=self.stock2_name)
 
        return self.prices1, self.prices2
 
    def load_data(self, prices1, prices2):
        """加载实际数据"""
        self.prices1 = prices1
        self.prices2 = prices2
 
    def test_cointegration(self, significance_level=0.05):
        """
        Engle-Granger 协整检验
 
        返回:
            is_cointegrated: 是否协整
            hedge_ratio: 对冲比率
            p_value: p 值
        """
        if self.prices1 is None or self.prices2 is None:
            raise ValueError("请先加载或生成数据")
 
        # 对齐数据
        df = pd.DataFrame({self.stock1_name: self.prices1,
                          self.stock2_name: self.prices2}).dropna()
        y = df[self.stock1_name].values
        x = df[self.stock2_name].values
 
        # OLS 回归
        x_with_const = np.column_stack([np.ones(len(x)), x])
        beta_hat = np.linalg.lstsq(x_with_const, y, rcond=None)[0]
        alpha, beta = beta_hat[0], beta_hat[1]
 
        # 计算价差
        spread = y - alpha - beta * x
        self.spread = pd.Series(spread, index=df.index)
 
        # ADF 检验
        adf_result = adfuller(spread, maxlag=1, regression='c')
        self.hedge_ratio = beta
 
        is_cointegrated = adf_result[1] < significance_level
 
        self.cointegration_results = {
            'is_cointegrated': is_cointegrated,
            'hedge_ratio': beta,
            'alpha': alpha,
            'p_value': adf_result[1],
            'test_statistic': adf_result[0],
            'critical_values': adf_result[4]
        }
 
        return is_cointegrated, beta, adf_result[1]
 
    def generate_signals(self, entry_threshold=2.0, exit_threshold=0.5,
                         stop_loss_threshold=4.0, lookback=20):
        """
        生成交易信号
 
        参数:
            entry_threshold: 入场 Z-score
            exit_threshold: 出场 Z-score
            stop_loss_threshold: 止损 Z-score
            lookback: 滚动窗口
        """
        if self.spread is None:
            raise ValueError("请先运行协整检验")
 
        spread = self.spread
        spread_mean = spread.rolling(window=lookback).mean()
        spread_std = spread.rolling(window=lookback).std()
        z_score = (spread - spread_mean) / spread_std
 
        self.signals = pd.DataFrame(index=spread.index)
        self.signals['spread'] = spread
        self.signals['z_score'] = z_score
        self.signals['position'] = 0
 
        current_position = 0
 
        for i in range(lookback, len(spread)):
            z = z_score.iloc[i]
 
            if current_position == 0:
                if z < -entry_threshold:
                    current_position = 1  # 做多价差
                elif z > entry_threshold:
                    current_position = -1  # 做空价差
 
            elif current_position == 1:
                if z > -exit_threshold or z < -stop_loss_threshold:
                    current_position = 0
 
            elif current_position == -1:
                if z < exit_threshold or z > stop_loss_threshold:
                    current_position = 0
 
            self.signals.iloc[i, self.signals.columns.get_loc('position')] = current_position
 
        return self.signals
 
    def backtest(self, transaction_cost=0.001):
        """
        回测策略
 
        参数:
            transaction_cost: 双边交易成本(比例)
 
        返回:
            returns: 策略收益序列
        """
        if self.signals is None:
            raise ValueError("请先生成信号")
 
        # 计算价格收益
        returns1 = self.prices1.pct_change()
        returns2 = self.prices2.pct_change()
 
        # 对齐
        df = pd.DataFrame({
            'position': self.signals['position'],
            'ret1': returns1,
            'ret2': returns2
        }).dropna()
 
        # 计算策略收益
        # 做多价差 = 做多 stock1 + 做空 hedge_ratio * stock2
        df['strategy_return'] = (
            df['position'] * df['ret1'] -
            df['position'] * self.hedge_ratio * df['ret2']
        )
 
        # 计算交易成本
        df['position_change'] = df['position'].diff().abs()
        df['transaction_cost'] = df['position_change'] * transaction_cost
 
        # 扣除成本后的收益
        df['net_return'] = df['strategy_return'] - df['transaction_cost']
 
        self.returns = df['net_return']
 
        # 计算绩效指标
        self._calculate_performance()
 
        return self.returns
 
    def _calculate_performance(self):
        """计算绩效指标"""
        if self.returns is None:
            return
 
        returns = self.returns
 
        # 基本指标
        total_return = (1 + returns).prod() - 1
        annual_return = (1 + returns).mean() * 252
        annual_vol = returns.std() * np.sqrt(252)
        sharpe_ratio = annual_return / annual_vol if annual_vol > 0 else 0
 
        # 最大回撤
        cumulative = (1 + returns).cumprod()
        running_max = cumulative.expanding().max()
        drawdown = (cumulative - running_max) / running_max
        max_drawdown = drawdown.min()
 
        # 胜率
        winning_days = (returns > 0).sum()
        total_days = (returns != 0).sum()
        win_rate = winning_days / total_days if total_days > 0 else 0
 
        # 交易次数
        trades = self.signals['position'].diff().abs().sum() / 2
 
        self.performance = {
            'total_return': total_return,
            'annual_return': annual_return,
            'annual_volatility': annual_vol,
            'sharpe_ratio': sharpe_ratio,
            'max_drawdown': max_drawdown,
            'win_rate': win_rate,
            'total_trades': trades
        }
 
    def plot_results(self):
        """可视化回测结果"""
        if self.signals is None or self.returns is None:
            raise ValueError("请先运行回测")
 
        fig, axes = plt.subplots(3, 1, figsize=(14, 10))
 
        # 1. 价格和协整关系
        ax1 = axes[0]
        ax1.plot(self.prices1.index, self.prices1.values,
                label=self.stock1_name, alpha=0.7)
        ax1.plot(self.prices2.index,
                self.cointegration_results['alpha'] +
                self.hedge_ratio * self.prices2.values,
                label=f"{self.stock2_name} (adjusted)", alpha=0.7)
        ax1.set_title('价格和协整关系')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
 
        # 2. 价差和信号
        ax2 = axes[1]
        ax2.plot(self.signals.index, self.signals['spread'].values,
                label='Spread', color='steelblue')
        ax2.fill_between(self.signals.index,
                         self.signals['spread'].mean() - 2*self.signals['spread'].std(),
                         self.signals['spread'].mean() + 2*self.signals['spread'].std(),
                         alpha=0.1, color='green', label='±2σ')
 
        long_positions = self.signals['position'] == 1
        short_positions = self.signals['position'] == -1
        ax2.scatter(self.signals.index[long_positions],
                   self.signals['spread'].values[long_positions],
                   color='green', marker='^', s=30, label='Long Spread', zorder=5)
        ax2.scatter(self.signals.index[short_positions],
                   self.signals['spread'].values[short_positions],
                   color='red', marker='v', s=30, label='Short Spread', zorder=5)
 
        ax2.set_title('价差和交易信号')
        ax2.legend()
        ax2.grid(True, alpha=0.3)
 
        # 3. 累计收益
        ax3 = axes[2]
        cumulative_returns = (1 + self.returns).cumprod()
        ax3.plot(cumulative_returns.index, cumulative_returns.values,
                label='Strategy', linewidth=2, color='darkblue')
        ax3.axhline(y=1, color='black', linestyle='--', alpha=0.5)
        ax3.set_title('累计收益')
        ax3.set_ylabel('Cumulative Return')
        ax3.grid(True, alpha=0.3)
 
        # 添加绩效信息
        perf_text = f"""
        总收益: {self.performance['total_return']:.2%}
        年化收益: {self.performance['annual_return']:.2%}
        夏普比率: {self.performance['sharpe_ratio']:.2f}
        最大回撤: {self.performance['max_drawdown']:.2%}
        胜率: {self.performance['win_rate']:.1%}
        交易次数: {self.performance['total_trades']:.0f}
        """
        ax3.text(0.02, 0.1, perf_text, transform=ax3.transAxes,
                fontsize=10, verticalalignment='top',
                bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
 
        plt.tight_layout()
        plt.show()
 
        return fig
 
    def print_summary(self):
        """打印策略摘要"""
        print("=" * 60)
        print("配对交易策略回测报告")
        print("=" * 60)
 
        print(f"\n配对: {self.stock1_name} vs {self.stock2_name}")
        print(f"对冲比率: {self.hedge_ratio:.4f}")
 
        print(f"\n协整检验:")
        print(f"  状态: {'✅ 协整' if self.cointegration_results['is_cointegrated'] else '❌ 不协整'}")
        print(f"  p 值: {self.cointegration_results['p_value']:.6f}")
 
        print(f"\n绩效指标:")
        for key, value in self.performance.items():
            if isinstance(value, float):
                if 'rate' in key or 'return' in key or 'drawdown' in key:
                    print(f"  {key}: {value:.2%}")
                else:
                    print(f"  {key}: {value:.4f}")
            else:
                print(f"  {key}: {value}")
 
        print("=" * 60)
 
 
# ============ 使用示例 ============
 
# 创建系统实例
system = PairsTradingSystem('Stock_A', 'Stock_B')
 
# 生成协整数据
prices1, prices2 = system.generate_cointegrated_data(
    n_periods=500,
    beta=1.5,
    alpha=10,
    noise_std=3,
    seed=42
)
 
# 协整检验
is_cointegrated, hedge_ratio, p_value = system.test_cointegration()
print(f"协整检验: {'通过' if is_cointegrated else '未通过'}, p 值: {p_value:.6f}")
 
# 生成信号
system.generate_signals(entry_threshold=2.0, exit_threshold=0.5, lookback=20)
 
# 回测
returns = system.backtest(transaction_cost=0.001)
 
# 可视化和报告
system.plot_results()
system.print_summary()

六、多配对组合

实际交易中会同时交易多个配对以分散风险。

class MultiPairsPortfolio:
    """
    多配对组合管理
 
    功能:
    1. 管理多个配对
    2. 资金分配
    3. 风险控制
    """
 
    def __init__(self, initial_capital=1000000):
        self.initial_capital = initial_capital
        self.pairs = []
        self.portfolio_returns = None
 
    def add_pair(self, stock1, stock2, hedge_ratio, spread, signals):
        """添加一个配对"""
        pair = {
            'stock1': stock1,
            'stock2': stock2,
            'hedge_ratio': hedge_ratio,
            'spread': spread,
            'signals': signals
        }
        self.pairs.append(pair)
 
    def calculate_portfolio_returns(self, prices_dict, weights=None):
        """
        计算组合收益
 
        参数:
            prices_dict: 价格字典 {stock_name: price_series}
            weights: 各配对权重(默认等权)
        """
        if not self.pairs:
            raise ValueError("没有添加任何配对")
 
        n_pairs = len(self.pairs)
 
        if weights is None:
            weights = np.ones(n_pairs) / n_pairs
 
        if len(weights) != n_pairs:
            raise ValueError("权重数量与配对数量不匹配")
 
        # 计算每个配对的收益
        pair_returns = []
 
        for pair in self.pairs:
            s1 = pair['stock1']
            s2 = pair['stock2']
            hedge = pair['hedge_ratio']
            position = pair['signals']['position']
 
            ret1 = prices_dict[s1].pct_change()
            ret2 = prices_dict[s2].pct_change()
 
            # 配对收益
            pair_ret = position.shift(1) * ret1 - position.shift(1) * hedge * ret2
            pair_returns.append(pair_ret)
 
        # 组合收益
        returns_df = pd.DataFrame(pair_returns).T
        self.portfolio_returns = returns_df.mul(weights, axis=1).sum(axis=1)
 
        return self.portfolio_returns
 
    def calculate_performance(self):
        """计算组合绩效"""
        if self.portfolio_returns is None:
            return None
 
        returns = self.portfolio_returns.dropna()
 
        metrics = {
            'total_return': (1 + returns).prod() - 1,
            'annual_return': (1 + returns).mean() * 252,
            'annual_vol': returns.std() * np.sqrt(252),
            'sharpe': (1 + returns).mean() * 252 / (returns.std() * np.sqrt(252)),
        }
 
        # 最大回撤
        cumulative = (1 + returns).cumprod()
        running_max = cumulative.expanding().max()
        drawdown = (cumulative - running_max) / running_max
        metrics['max_drawdown'] = drawdown.min()
 
        return metrics

七、统计套利的风险

7.1 主要风险类型

风险类型描述缓解方法
协整关系断裂历史关系不再成立定期重新检验,设置止损
制度转换市场结构性变化多元化配对,动态调整
执行成本滑点、佣金、价差优化交易时机,控制换手
模型风险模型假设错误压力测试,多个模型验证
流动性风险无法按期望价格成交选择流动性好的配对

7.2 风险管理实践

def risk_management_checks(spread, current_position, checks_config):
    """
    风险管理检查
 
    参数:
        spread: 当前价差
        current_position: 当前持仓
        checks_config: 风险参数配置
 
    返回:
        action: 建议操作 ('hold', 'close', 'reduce')
        reason: 原因说明
    """
    checks = {
        'max_loss': checks_config.get('max_loss', 0.10),  # 最大亏损
        'max_holding_days': checks_config.get('max_holding_days', 20),
        'min_margin': checks_config.get('min_margin', 0.05),  # 最低保证金
    }
 
    # 计算当前持仓盈亏
    if hasattr(spread, 'entry_price'):
        pnl = (spread - spread.entry_price) / spread.entry_price
 
        if pnl < -checks['max_loss']:
            return 'close', f'触发止损: 亏损 {pnl:.2%}'
 
    return 'hold', '风险正常'

八、ML 增强统计套利

8.1 LSTM 预测价差

from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
 
def prepare_lstm_data(spread, lookback=20):
    """
    准备 LSTM 训练数据
 
    参数:
        spread: 价差序列
        lookback: 输入窗口
 
    返回:
        X, y: 训练数据
    """
    data = spread.values.reshape(-1, 1)
 
    X, y = [], []
    for i in range(lookback, len(data)):
        X.append(data[i-lookback:i, 0])
        y.append(data[i, 0])
 
    return np.array(X), np.array(y)
 
 
def build_lstm_model(lookback=20):
    """构建 LSTM 模型"""
    model = Sequential([
        LSTM(50, activation='relu', input_shape=(lookback, 1)),
        Dense(1)
    ])
    model.compile(optimizer='adam', loss='mse')
    return model
 
 
# 使用示例(需要安装 tensorflow)
"""
# 准备数据
X, y = prepare_lstm_data(spread_series, lookback=20)
 
# 划分训练集
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]
 
# 构建和训练模型
model = build_lstm_model(lookback=20)
model.fit(X_train, y_train, epochs=50, batch_size=32, verbose=0)
 
# 预测
predictions = model.predict(X_test)
 
# 基于预测生成信号
# 如果预测价差会上涨 → 做多价差
# 如果预测价差会下跌 → 做空价差
"""

8.2 动态对冲比率

传统方法使用固定对冲比率,ML 可以实现动态调整:

def dynamic_hedge_ratio(price1, price2, window=60):
    """
    动态对冲比率
 
    使用滚动窗口回归,实时更新对冲比率
 
    参数:
        price1, price2: 价格序列
        window: 回归窗口
 
    返回:
        hedge_ratios: 动态对冲比率序列
    """
    hedge_ratios = pd.Series(index=price1.index, dtype=float)
 
    for i in range(window, len(price1)):
        y = price1.iloc[i-window:i].values
        x = price2.iloc[i-window:i].values
 
        # OLS 回归
        slope = np.cov(x, y)[0, 1] / np.var(x)
        hedge_ratios.iloc[i] = slope
 
    return hedge_ratios

核心知识点总结

协整检验

  • Engle-Granger 两步法:适合两变量
  • Johansen 检验:适合多变量
  • 平稳性检验:ADF 检验验证残差平稳

配对选择

  • 距离法:价格走势相似度
  • 协整法:直接检验协整关系
  • 聚类法:基于特征分组

信号生成

  • Z-score:标准化价差
  • Bollinger Band:均值 ± 标准差带
  • 阈值设置:入场、出场、止损

风险管理

  • 协整关系断裂风险
  • 执行成本控制
  • 多配对分散风险

ML 增强

  • LSTM 预测价差方向
  • 动态对冲比率
  • 聚类发现配对

下一步

继续学习 03-信号到组合.md,了解如何将 alpha 信号转化为实际可交易的投资组合。