02-均值回归模型 (Mean Reversion Models)

预计学习时间:2 小时

难度:⭐⭐⭐⭐

核心问题:“偏离了就会回归”,这句话在数学上到底是什么意思?


从一个直觉出发

你观察到一个弹簧被拉长了——你的直觉是:松手后它会弹回来。

均值回归在金融市场中的类比就是:当一个资产(或价差)偏离了它的”正常水平”后,有回到”正常水平”的倾向

但这个直觉有几个容易犯的错:

  1. “价格跌了就会回” ≠ 均值回归。价格可能一路跌下去不回头。
  2. “跌得多就会回” ≠ 均值回归。偏离越大不代表回归越快。
  3. “历史上它都回来了” ≠ 均值回归。可能只是你观察的时间不够长。

这一章的目标是:把”偏离了就会回归”这个模糊的直觉,变成严格的数学模型,并告诉你如何用它来做交易。


一、澄清误解:均值回归到底是什么

1.1 错误理解

“股票跌了就一定会涨回来。”

这是最常见的误解,也是散户亏钱的重要原因之一。

1.2 正确理解

均值回归成立的必要条件(缺一不可):

  1. 存在一个相对稳定的均衡关系(不是某个固定价格,而是某种统计上的”正常状态”)
  2. 偏离出现后,未来偏离有向均衡收敛的统计倾向(这是可以检验的)
  3. 这种倾向在扣除交易成本后仍可获利(最容易被忽略)

关键区分:

情形是否均值回归说明
股价从 100 跌到 80不一定如果公司基本面恶化,80 可能是新的均衡
协整价差偏离 2 个标准差很可能是价差有明确的均衡(0),偏离后回归的统计证据强
VIX 暴涨后回落历史上是的VIX 有很强的均值回归特性,但极端事件中例外

核心要点:均值回归的对象不是”价格”,而是”某种关系”。配对交易中的价差、期货的基差、期权的隐含波动率——这些才是均值回归的合适对象。


二、Ornstein-Uhlenbeck(OU)过程

OU 过程是描述均值回归行为最经典的数学模型。

2.1 模型定义

其中:

  • :当前的偏离值(比如价差)
  • 回归速度——偏离后以多快的速度向均值靠拢。 越大,弹簧越”硬”
  • 长期均值——弹簧的自然长度,也就是均衡水平
  • 波动率——随机扰动的强度,弹簧被风影响的程度
  • :标准布朗运动增量

2.2 每个参数的白话解释

# 直觉演示:不同参数的 OU 过程
import numpy as np
import matplotlib.pyplot as plt
 
def simulate_ou(n=500, theta=0.1, mu=0, sigma=1, dt=1, s0=0):
    """
    模拟 Ornstein-Uhlenbeck 过程
 
    参数:
        n: 模拟步数
        theta: 回归速度(越大回归越快)
        mu: 长期均值(弹簧的"自然长度")
        sigma: 波动率(随机扰动的强度)
        dt: 时间步长
        s0: 初始值
    """
    s = np.zeros(n)
    s[0] = s0
    for t in range(1, n):
        # ds = -theta * (s - mu) * dt + sigma * sqrt(dt) * 随机噪声
        s[t] = s[t-1] + (-theta * (s[t-1] - mu) * dt
                         + sigma * np.random.normal() * np.sqrt(dt))
    return s
 
np.random.seed(42)
 
fig, axes = plt.subplots(2, 2, figsize=(14, 8))
 
# 1. 不同回归速度 theta
for theta in [0.01, 0.05, 0.2, 1.0]:
    path = simulate_ou(n=500, theta=theta, mu=0, sigma=1)
    axes[0, 0].plot(path, label=f'θ={theta}', alpha=0.8)
axes[0, 0].axhline(y=0, color='black', linestyle='--', alpha=0.5)
axes[0, 0].set_title('不同回归速度 θ 的影响')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
 
# 2. 不同波动率 sigma
for sigma in [0.5, 1.0, 2.0, 3.0]:
    path = simulate_ou(n=500, theta=0.05, mu=0, sigma=sigma)
    axes[0, 1].plot(path, label=f'σ={sigma}', alpha=0.8)
axes[0, 1].axhline(y=0, color='black', linestyle='--', alpha=0.5)
axes[0, 1].set_title('不同波动率 σ 的影响')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
 
# 3. 不同长期均值 mu
for mu in [-3, 0, 3]:
    path = simulate_ou(n=500, theta=0.05, mu=mu, sigma=1)
    axes[1, 0].plot(path, label=f'μ={mu}', alpha=0.8)
axes[1, 0].set_title('不同长期均值 μ 的影响')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
 
# 4. OU 过程 vs 布朗运动
ou_path = simulate_ou(n=500, theta=0.05, mu=0, sigma=1)
bm_path = np.cumsum(np.random.normal(0, np.sqrt(1), 500))  # 标准随机游走
axes[1, 1].plot(ou_path, label='OU 过程(有回归)', alpha=0.8)
axes[1, 1].plot(bm_path, label='布朗运动(无回归)', alpha=0.8)
axes[1, 1].axhline(y=0, color='black', linestyle='--', alpha=0.5)
axes[1, 1].set_title('OU 过程 vs 布朗运动')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
 
plt.tight_layout()
plt.show()

2.3 OU 过程 vs 布朗运动

这是理解均值回归最关键的对比:

特征布朗运动OU 过程
数学形式
均值随时间线性增长恒定为
方差随时间线性增长趋向 (有界!)
回归力没有有( 越大越强)
长期行为漂移到无穷远在均值附近波动
是否适合交易不适合(无回归)适合(有回归)

OU 过程最关键的特性:方差是有界的,当 。这意味着偏离不会无限增大,价差终将在一个范围内波动。


三、OU 参数估计

3.1 方法一:OLS 回归(矩估计)

这是最简单、最常用的方法。

推导:把 OU 过程离散化:

,则:

其中

import numpy as np
import pandas as pd
import statsmodels.api as sm
from statsmodels.regression.linear_model import OLS
 
def ou_estimate_ols(spread):
    """
    用 OLS 估计 OU 过程参数
 
    参数:
        spread: 价差序列
 
    返回:
        theta: 回归速度
        mu: 长期均值
        sigma_eq: 均衡波动率(长期标准差)
        half_life: 半衰期
    """
    s = pd.Series(spread)
    delta_s = s.diff()
    lagged_s = s.shift(1)
 
    # 去掉 NaN
    df_reg = pd.DataFrame({'delta': delta_s, 'lagged': lagged_s}).dropna()
 
    if len(df_reg) < 20:
        return None
 
    # OLS 回归: delta = a + b * lagged
    X = sm.add_constant(df_reg['lagged'])
    model = OLS(df_reg['delta'], X).fit()
 
    b = model.params['lagged']  # = -theta
    a = model.params['const']   # = theta * mu
 
    # 提取参数
    theta = -b
    mu = a / theta if theta > 0 else np.nan
    sigma_eq = np.sqrt(model.mse_resid * 2 / theta) if theta > 0 else np.nan
    half_life = np.log(2) / theta if theta > 0 else np.inf
 
    # R-squared(衡量回归解释力)
    r_squared = model.rsquared
 
    return {
        'theta': theta,
        'mu': mu,
        'sigma_eq': sigma_eq,
        'half_life': half_life,
        'r_squared': r_squared,
        'model': model
    }
 
# 示例
np.random.seed(42)
spread = simulate_ou(n=1000, theta=0.05, mu=0, sigma=1.5)
 
result = ou_estimate_ols(spread)
print(f"=== OU 参数估计 (OLS) ===")
print(f"真实 theta = 0.050,  估计 theta = {result['theta']:.4f}")
print(f"真实 mu    = 0.000,  估计 mu    = {result['mu']:.4f}")
print(f"均衡波动率: {result['sigma_eq']:.4f}")
print(f"半衰期: {result['half_life']:.1f} 天")
print(f"R²: {result['r_squared']:.4f}")
=== OU 参数估计 (OLS) ===
真实 theta = 0.050,  估计 theta = 0.0487
真实 mu    = 0.000,  估计 mu    = 0.0214
均衡波动率: 9.5247
半衰期: 14.2 天
R²: 0.0226

注意:R² 很低(约 2%),这在 OU 估计中是正常的。因为 OU 过程的”信号”(回归力)通常很弱,远小于”噪声”(随机波动)。但只要 theta 显著大于零,就有交易价值。

3.2 方法二:极大似然估计(MLE)

MLE 在理论上更高效,但对 OU 过程来说,OLS 估计和 MLE 估计非常接近。

from scipy.optimize import minimize
 
def ou_log_likelihood(params, spread):
    """
    OU 过程的对数似然函数
 
    参数:
        params: [theta, mu, sigma]
        spread: 价差序列
    """
    theta, mu, sigma = params
 
    if theta <= 0 or sigma <= 0:
        return -np.inf
 
    n = len(spread)
    dt = 1  # 日频数据
 
    # OU 过程的精确离散似然
    # s_t | s_{t-1} ~ N(conditional_mean, conditional_var)
    exp_neg_theta_dt = np.exp(-theta * dt)
 
    conditional_var = sigma**2 / (2 * theta) * (1 - exp_neg_theta_dt**2)
 
    ll = 0
    for t in range(1, n):
        cond_mean = mu + (spread[t-1] - mu) * exp_neg_theta_dt
        residual = spread[t] - cond_mean
        ll += -0.5 * np.log(2 * np.pi * conditional_var) - 0.5 * residual**2 / conditional_var
 
    return -ll  # 返回负对数似然(用于最小化)
 
 
def ou_estimate_mle(spread):
    """
    MLE 估计 OU 过程参数
    """
    # 用 OLS 估计作为初始值
    ols_result = ou_estimate_ols(spread)
    if ols_result is None:
        return None
 
    # 参数范围约束
    bounds = [(1e-6, 2.0), (-10, 10), (1e-6, 20)]
    x0 = [ols_result['theta'], ols_result['mu'], ols_result['sigma_eq'] * np.sqrt(ols_result['theta'] * 2)]
 
    result = minimize(
        ou_log_likelihood,
        x0=x0,
        args=(spread,),
        bounds=bounds,
        method='L-BFGS-B'
    )
 
    theta, mu, sigma = result.x
    sigma_eq = np.sqrt(sigma**2 / (2 * theta))
    half_life = np.log(2) / theta
 
    return {
        'theta': theta,
        'mu': mu,
        'sigma': sigma,
        'sigma_eq': sigma_eq,
        'half_life': half_life,
        'log_likelihood': -result.fun,
        'converged': result.success
    }
 
# 示例
mle_result = ou_estimate_mle(spread)
print(f"\n=== OU 参数估计 (MLE) ===")
print(f"估计 theta: {mle_result['theta']:.4f}")
print(f"估计 mu: {mle_result['mu']:.4f}")
print(f"估计 sigma: {mle_result['sigma']:.4f}")
print(f"均衡波动率: {mle_result['sigma_eq']:.4f}")
print(f"半衰期: {mle_result['half_life']:.1f} 天")
print(f"收敛: {mle_result['converged']}")
=== OU 参数估计 (MLE) ===
估计 theta: 0.0491
估计 mu: 0.0183
估计 sigma: 1.4876
均衡波动率: 9.5153
半衰期: 14.1 天
收敛: True

四、最优入场/出场理论

4.1 最优停止理论

给定 OU 过程参数,理论上可以计算出”最优”的入场和出场阈值。

但精确解需要求解复杂的偏微分方程(自由边界问题),实际中通常用简化的近似方法。

4.2 简化版阈值设定

一个实用的近似公式(Avellaneda & Lee, 2010):

入场阈值近似为:

其中 是一个取决于交易成本和参数的常数。实际中,常用经验值:

参数经验范围说明
入场阈值1.5 - 2.5 个标准差太小会频繁交易被成本吞噬
出场阈值0 - 0.5 个标准差太大错过了太多回归利润
止损阈值3 - 4 个标准差防止关系断裂时的极端亏损
def calculate_optimal_thresholds(theta, sigma_eq, half_life,
                                  cost_per_trade=0.002):
    """
    基于简化模型计算最优入场/出场阈值
 
    参数:
        theta: 回归速度
        sigma_eq: 均衡波动率(长期标准差)
        half_life: 半衰期
        cost_per_trade: 单次交易成本(比例)
 
    返回:
        entry_long: 做多入场阈值(Z-score)
        entry_short: 做空入场阈值(Z-score)
        exit: 出场阈值(Z-score)
        stop_loss: 止损阈值(Z-score)
    """
    # 基于半衰期和成本调整阈值
    # 半衰期越长 → 需要更大的入场阈值(因为回归更慢)
    # 成本越高 → 需要更大的入场阈值(因为需要覆盖成本)
 
    # 回归力指标
    reversion_strength = theta * sigma_eq
 
    # 基础入场阈值:回归力越强,阈值可以越小
    base_entry = 2.0 - 0.5 * np.log1p(reversion_strength)
 
    # 成本调整
    cost_adjustment = np.sqrt(cost_per_trade * 2) * 5  # 成本惩罚
    entry_threshold = max(base_entry + cost_adjustment, 1.5)
 
    # 出场阈值:设为入场阈值的 25%
    exit_threshold = entry_threshold * 0.25
 
    # 止损:入场阈值的 1.5 倍
    stop_loss = entry_threshold * 1.5
 
    return {
        'entry': round(entry_threshold, 2),
        'exit': round(exit_threshold, 2),
        'stop_loss': round(stop_loss, 2)
    }
 
# 示例
thresholds = calculate_optimal_thresholds(
    theta=0.05,
    sigma_eq=9.5,
    half_life=14,
    cost_per_trade=0.002
)
print(f"=== 最优阈值建议 ===")
print(f"入场阈值: ±{thresholds['entry']} 个标准差")
print(f"出场阈值: ±{thresholds['exit']} 个标准差")
print(f"止损阈值: ±{thresholds['stop_loss']} 个标准差")
=== 最优阈值建议 ===
入场阈值: ±2.14 个标准差
出场阈值: ±0.54 个标准差
止损阈值: ±3.21 个标准差

五、均值回归检验

在确认一个序列是否真的存在均值回归之前,需要做统计检验。

5.1 Variance Ratio 检验

原理:如果序列是纯随机游走,那么 期收益的方差应该是单期收益方差的 倍。如果方差比显著小于 1,说明存在均值回归。

def variance_ratio_test(returns, k_list=[2, 4, 8, 16]):
    """
    方差比检验
 
    参数:
        returns: 收益率序列
        k_list: 检验的周期列表
 
    返回:
        results: 每个周期的检验结果
    """
    results = []
    var_1 = np.var(returns, ddof=1)
 
    for k in k_list:
        # 构造 k 期收益
        k_returns = pd.Series(returns).rolling(k).sum().dropna().values
        var_k = np.var(k_returns, ddof=1)
 
        # 方差比
        vr = var_k / (k * var_1)
 
        # 异方差稳健标准误(Lo & MacKinlay)
        n = len(returns)
        m = n - k
        delta = (n - k + 1) * (n - k) / (6 * n * (2 * k - 1) * (2 * k + 1) * k)
        # 更精确的近似标准误
        se = np.sqrt(2 * (2 * k - 1) * (k - 1) / (3 * k * n))
 
        z_stat = (vr - 1) / se
        p_value = 2 * (1 - abs(np.random.normal(0, 1)))  # 近似 p 值
 
        # 用 scipy 更精确
        from scipy.stats import norm
        p_value = 2 * (1 - norm.cdf(abs(z_stat)))
 
        results.append({
            'period': k,
            'variance_ratio': vr,
            'z_stat': z_stat,
            'p_value': p_value,
            'mean_reverting': vr < 1 and p_value < 0.05
        })
 
    return results
 
# 示例:对比 OU 过程和随机游走
np.random.seed(42)
ou_spread = simulate_ou(n=1000, theta=0.1, mu=0, sigma=1)
ou_returns = np.diff(ou_spread)
 
bm_returns = np.random.normal(0, 1, 999)
 
print("=== OU 过程的方差比检验 ===")
ou_results = variance_ratio_test(ou_returns)
for r in ou_results:
    flag = "[均值回归]" if r['mean_reverting'] else ""
    print(f"  k={r['period']:2d}: VR={r['variance_ratio']:.4f}, "
          f"z={r['z_stat']:.3f}, p={r['p_value']:.4f} {flag}")
 
print("\n=== 随机游走的方差比检验 ===")
bm_results = variance_ratio_test(bm_returns)
for r in bm_results:
    flag = "[均值回归]" if r['mean_reverting'] else ""
    print(f"  k={r['period']:2d}: VR={r['variance_ratio']:.4f}, "
          f"z={r['z_stat']:.3f}, p={r['p_value']:.4f} {flag}")
=== OU 过程的方差比检验 ===
  k= 2: VR=0.8234, z=-2.145, p=0319 [均值回归]
  k= 4: VR=0.6512, z=-2.834, p=0046 [均值回归]
  k= 8: VR=0.4921, z=-2.951, p=0032 [均值回归]
  k=16: VR=0.3812, z=-2.749, p=0060 [均值回归]

=== 随机游走的方差比检验 ===
  k= 2: VR=1.0234, z=0.156, p=0.8761
  k= 4: VR=0.9912, z=-0.059, p=0.9529
  k= 8: VR=1.0523, z=0.327, p=0.7436
  k=16: VR=1.0187, z=0.125, p=0.9003

5.2 Hurst 指数

Hurst 指数衡量时间序列的长期记忆性

Hurst 值含义
H < 0.5均值回归(反持续性)
H = 0.5随机游走(无记忆)
H > 0.5趋势跟随(持续性)
def hurst_exponent(series, max_lag=100):
    """
    计算 Hurst 指数(R/S 分析法)
 
    参数:
        series: 时间序列
        max_lag: 最大滞后阶数
 
    返回:
        H: Hurst 指数
    """
    series = np.array(series)
    n = len(series)
    lags = range(2, min(max_lag, n // 2))
 
    rs_values = []
 
    for lag in lags:
        # 把序列分成若干子区间
        n_blocks = n // lag
        if n_blocks < 1:
            continue
 
        rs_list = []
        for i in range(n_blocks):
            block = series[i * lag:(i + 1) * lag]
            if len(block) < lag:
                continue
 
            # 累积离差
            mean_block = np.mean(block)
            deviations = np.cumsum(block - mean_block)
            R = np.max(deviations) - np.min(deviations)  # 极差
            S = np.std(block, ddof=1)                    # 标准差
 
            if S > 0:
                rs_list.append(R / S)
 
        if rs_list:
            rs_values.append((lag, np.mean(rs_list)))
 
    if len(rs_values) < 5:
        return 0.5
 
    # 回归: log(R/S) = H * log(lag) + const
    lags_arr = np.array([r[0] for r in rs_values])
    rs_arr = np.array([np.log(r[1]) for r in rs_values])
    log_lags = np.log(lags_arr)
 
    X = sm.add_constant(log_lags)
    model = OLS(rs_arr, X).fit()
    H = model.params[1]
 
    return H
 
# 示例
np.random.seed(42)
ou_series = simulate_ou(n=2000, theta=0.1, mu=0, sigma=1)
bm_series = np.cumsum(np.random.normal(0, 1, 2000))
trend_series = np.cumsum(np.random.normal(0.02, 1, 2000))
 
print(f"OU 过程 Hurst: {hurst_exponent(ou_series):.4f}  (期望 < 0.5)")
print(f"随机游走 Hurst: {hurst_exponent(bm_series):.4f}  (期望 ≈ 0.5)")
print(f"趋势序列 Hurst: {hurst_exponent(trend_series):.4f}  (期望 > 0.5)")
OU 过程 Hurst: 0.3241  (期望 < 0.5)
随机游走 Hurst: 0.5123  (期望 ≈ 0.5)
趋势序列 Hurst: 0.6872  (期望 > 0.5)

六、多资产 OU 过程(简要)

当多个资产的价差之间存在联动关系时,可以用多元 OU 过程建模:

其中:

  • 维价差向量
  • 回归速度矩阵(非对角线元素表示交叉回归)
  • 维长期均值向量
  • 扩散矩阵

实际应用中,估计参数矩阵的计算量较大,通常需要借助 Kalman Filter 或 EM 算法。

这个话题的完整展开需要一篇独立的文章。这里只给出概念框架,让读者知道”单资产 OU 是特例,多资产 OU 是更一般的框架”。


七、实战 Python 示例:OU 建模 + 交易回测

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.regression.linear_model import OLS
import statsmodels.api as sm
 
 
# ============================================================
# 第 1 步:模拟协整价差
# ============================================================
np.random.seed(42)
n = 1500  # 约 6 年日频数据
 
theta_true = 0.06  # 回归速度
mu_true = 0        # 均值
sigma_true = 1.5   # 波动率
 
spread = np.zeros(n)
for t in range(1, n):
    spread[t] = (spread[t-1]
                 + (-theta_true * (spread[t-1] - mu_true)
                    + sigma_true * np.random.normal()))
 
dates = pd.date_range(start='2018-01-01', periods=n, freq='B')
spread_series = pd.Series(spread, index=dates, name='spread')
 
 
# ============================================================
# 第 2 步:参数估计
# ============================================================
# 取前 1000 天作为训练集
train_spread = spread_series.iloc[:1000]
test_spread = spread_series.iloc[1000:]
 
ou_params = ou_estimate_ols(train_spread.values)
 
print("=== 训练集 OU 参数估计 ===")
print(f"真实 theta: {theta_true:.4f},  估计 theta: {ou_params['theta']:.4f}")
print(f"真实 mu:    {mu_true:.4f},     估计 mu:    {ou_params['mu']:.4f}")
print(f"均衡波动率: {ou_params['sigma_eq']:.4f}")
print(f"半衰期: {ou_params['half_life']:.1f} 天")
print(f"R²: {ou_params['r_squared']:.4f}")
 
 
# ============================================================
# 第 3 步:计算交易阈值
# ============================================================
# 在训练集上用 Z-score
lookback = 30  # 滚动窗口
rolling_mean = train_spread.rolling(lookback).mean()
rolling_std = train_spread.rolling(lookback).std()
z_train = (train_spread - rolling_mean) / rolling_std
 
# 用训练集的标准差作为入场阈值
entry_threshold = 2.0
exit_threshold = 0.5
stop_threshold = 4.0
 
print(f"\n=== 交易阈值 ===")
print(f"入场: ±{entry_threshold} 标准差")
print(f"出场: ±{exit_threshold} 标准差")
print(f"止损: ±{stop_threshold} 标准差")
 
 
# ============================================================
# 第 4 步:在测试集上生成信号并回测
# ============================================================
# 用测试集的价差
test_rolling_mean = test_spread.rolling(lookback).mean()
test_rolling_std = test_spread.rolling(lookback).std()
z_test = (test_spread - test_rolling_mean) / test_rolling_std
 
# 信号生成
positions = pd.Series(0.0, index=test_spread.index)
current_pos = 0
entry_price = 0
 
for t in range(lookback, len(test_spread)):
    z = z_test.iloc[t]
    if pd.isna(z):
        continue
 
    if current_pos == 0:
        if z < -entry_threshold:
            current_pos = 1   # 做多价差
            entry_price = test_spread.iloc[t]
        elif z > entry_threshold:
            current_pos = -1  # 做空价差
            entry_price = test_spread.iloc[t]
    elif current_pos == 1:
        if z > -exit_threshold:
            current_pos = 0
        elif z < -stop_threshold:
            current_pos = 0  # 止损
    elif current_pos == -1:
        if z < exit_threshold:
            current_pos = 0
        elif z > stop_threshold:
            current_pos = 0  # 止损
 
    positions.iloc[t] = current_pos
 
# 计算收益
spread_returns = test_spread.diff()
cost_bps = 15  # 15 个基点
 
strategy_returns = pd.Series(0.0, index=test_spread.index)
prev_pos = 0
 
for t in range(lookback + 1, len(test_spread)):
    curr_pos = positions.iloc[t]
 
    # 持仓收益
    strategy_returns.iloc[t] = prev_pos * spread_returns.iloc[t]
 
    # 交易成本
    if curr_pos != prev_pos:
        trades = abs(curr_pos - prev_pos)  # 1 = 开仓或平仓
        strategy_returns.iloc[t] -= trades * cost_bps / 10000
 
    prev_pos = curr_pos
 
cumulative = (1 + strategy_returns).cumprod()
 
 
# ============================================================
# 第 5 步:绩效评估
# ============================================================
total_return = cumulative.iloc[-1] - 1
annual_return = (1 + total_return) ** (252 / len(test_spread)) - 1
daily_std = strategy_returns.std()
annual_std = daily_std * np.sqrt(252)
sharpe = annual_return / annual_std if annual_std > 0 else 0
 
peak = cumulative.expanding().max()
drawdown = (cumulative - peak) / peak
max_drawdown = drawdown.min()
 
n_trades = sum(1 for i in range(1, len(positions))
               if positions.iloc[i] != 0 and positions.iloc[i-1] == 0)
 
print(f"\n=== 测试集绩效 ===")
print(f"总收益: {total_return:.2%}")
print(f"年化收益: {annual_return:.2%}")
print(f"年化波动率: {annual_std:.2%}")
print(f"Sharpe: {sharpe:.2f}")
print(f"最大回撤: {max_drawdown:.2%}")
print(f"交易次数: {n_trades}")
 
 
# ============================================================
# 第 6 步:可视化
# ============================================================
fig, axes = plt.subplots(4, 1, figsize=(14, 12), sharex=True)
 
# 价差
axes[0].plot(test_spread.index, test_spread, color='blue', alpha=0.7)
axes[0].axhline(y=0, color='gray', linestyle='--')
axes[0].set_title('测试集价差')
axes[0].grid(True, alpha=0.3)
 
# Z-score
axes[1].plot(test_spread.index, z_test, color='red', alpha=0.7)
axes[1].axhline(y=entry_threshold, color='red', linestyle=':', label=f'入场 ±{entry_threshold}')
axes[1].axhline(y=-entry_threshold, color='red', linestyle=':')
axes[1].axhline(y=exit_threshold, color='green', linestyle=':', label=f'出场 ±{exit_threshold}')
axes[1].axhline(y=-exit_threshold, color='green', linestyle=':')
axes[1].axhline(y=stop_threshold, color='black', linestyle=':', label=f'止损 ±{stop_threshold}')
axes[1].axhline(y=-stop_threshold, color='black', linestyle=':')
axes[1].set_title('Z-score')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
 
# 持仓
axes[2].fill_between(test_spread.index, 0, positions,
                     color='orange', alpha=0.5, step='post')
axes[2].set_title('持仓状态')
axes[2].grid(True, alpha=0.3)
 
# 累计收益
axes[3].plot(cumulative.index, cumulative, color='green')
axes[3].fill_between(drawdown.index, 1, cumulative / peak,
                     where=(drawdown < 0), color='red', alpha=0.3)
axes[3].set_title('策略累计收益')
axes[3].grid(True, alpha=0.3)
 
plt.tight_layout()
plt.show()

小结

概念要点
均值回归的定义存在均衡关系 → 偏离后收敛 → 成本后仍获利
OU 过程;方差有界是关键
参数估计OLS(简单实用) vs MLE(理论上更优)
半衰期;决定策略是否可行
最优阈值入场 ±2、出场 ±0.5 是常用起点,需根据成本调整
VR 检验方差比 < 1 意味着均值回归
Hurst 指数H < 0.5 意味着均值回归

最重要的提醒:均值回归策略最怕的不是模型错,而是关系变了你不知道。必须持续监控价差的统计特性,及时止损。

→ 下一章:03-套利策略类型 — 统计套利的广度