时间序列分析

预计学习时间:3-4 小时

难度:⭐⭐⭐

核心问题:金融数据天然有序、时间依赖,如何用统计工具捕捉这种动态结构?


概述

时间序列分析是量化投资中不可或缺的数学工具。与截面数据不同,金融时间序列具有时序依赖——今天的收益率受昨天的影响,波动率存在聚集效应,多个价格序列之间可能存在长期均衡关系。

本节内容涵盖:

  • 平稳性与检验:ADF 检验
  • ACF/PACF:自相关函数与 Ljung-Box 检验
  • ARIMA/SARIMA:经典自回归建模
  • GARCH 族:波动率聚类建模
  • 协整:Engle-Granger、Johansen、配对交易
  • Granger 因果:领先滞后分析
  • Kalman 滤波器:时变参数估计
  • HMM:市场状态识别
  • 谱分析与 FFT:频域分析

一、平稳性与 ADF 检验

1.1 平稳性定义

平稳性类型定义金融示例
弱平稳均值、方差、自协方差恒定对数收益率近似弱平稳
差分平稳一阶差分后平稳大多数股票价格序列

1.2 ADF 检验

原假设 :存在单位根(非平稳)。ADF 回归方程:

核心判断: 是否显著不为零。

import numpy as np
from statsmodels.tsa.stattools import adfuller
 
np.random.seed(42)
n = 500
 
# 生成三种序列
stationary = np.random.normal(0, 1, n)                          # 白噪声(平稳)
random_walk = np.cumsum(np.random.normal(0, 1, n))              # 随机游走(非平稳)
drift_walk = np.cumsum(np.random.normal(0.01, 1, n))            # 带漂移(非平稳)
 
def run_adf(series, name):
    """执行 ADF 检验并打印结果"""
    result = adfuller(series, autolag='AIC')
    print(f"\n{'='*50}")
    print(f"ADF 检验: {name}")
    print(f"  ADF 统计量: {result[0]:.4f}")
    print(f"  p 值:       {result[1]:.6f}")
    conclusion = "平稳" if result[1] < 0.05 else "非平稳"
    print(f"  >>> 结论: 序列【{conclusion}】")
 
run_adf(stationary, "白噪声")
run_adf(random_walk, "随机游走")
run_adf(drift_walk, "带漂移随机游走")

二、ACF / PACF 与 Ljung-Box 检验

2.1 ACF 与 PACF

ACF,衡量序列与滞后值的线性相关。

PACF:在控制中间滞后影响后的相关性。ACF 是”总效应”,PACF 是”直接效应”。

2.2 用 ACF/PACF 识别 ARIMA 阶数

模型ACF 特征PACF 特征
AR(p)拖尾(指数衰减)p 阶后截尾
MA(q)q 阶后截尾拖尾(指数衰减)
ARMA(p,q)拖尾拖尾

2.3 Ljung-Box 检验

:前 阶自相关全部为零(纯随机序列)。

from statsmodels.tsa.stattools import acf, pacf
from statsmodels.stats.diagnostic import acorr_ljungbox
import matplotlib.pyplot as plt
 
np.random.seed(42)
 
# 生成 AR(2) 过程:y_t = 0.7*y_{t-1} + 0.2*y_{t-2} + noise
n = 500
noise = np.random.normal(0, 1, n)
ar_process = np.zeros(n)
for t in range(2, n):
    ar_process[t] = 0.7 * ar_process[t-1] + 0.2 * ar_process[t-2] + noise[t]
 
# 计算 ACF 和 PACF
acf_vals, acf_ci = acf(ar_process, nlags=20, alpha=0.05)
pacf_vals, pacf_ci = pacf(ar_process, nlags=20, alpha=0.05)
 
# 绘图
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
lags = np.arange(21)
 
axes[0].bar(lags, acf_vals, color='steelblue', alpha=0.7)
axes[0].fill_between(lags, acf_ci[:, 0], acf_ci[:, 1], alpha=0.2, color='steelblue')
axes[0].axhline(y=0, color='black', linewidth=0.5)
axes[0].set_title('自相关函数 (ACF)')
 
axes[1].bar(lags, pacf_vals, color='darkorange', alpha=0.7)
axes[1].fill_between(lags, pacf_ci[:, 0], pacf_ci[:, 1], alpha=0.2, color='darkorange')
axes[1].axhline(y=0, color='black', linewidth=0.5)
axes[1].set_title('偏自相关函数 (PACF)')
 
plt.tight_layout()
plt.show()
 
# Ljung-Box 检验
lb_result = acorr_ljungbox(ar_process, lags=[5, 10, 15], return_df=True)
print("Ljung-Box 检验(AR(2)过程,应显著):")
print(lb_result)
 
lb_white = acorr_ljungbox(noise, lags=[5, 10, 15], return_df=True)
print("\nLjung-Box 检验(白噪声,应不显著):")
print(lb_white)

三、ARIMA / SARIMA

3.1 ARIMA(p,d,q)

组件名称含义
AR(p)自回归
I(d)差分对序列做 d 阶差分
MA(q)移动平均

3.2 参数选择与建模

from statsmodels.tsa.arima.model import ARIMA
import warnings
warnings.filterwarnings('ignore')
 
np.random.seed(42)
n = 1000
 
# 模拟 ARIMA(2,0,1) 数据
from scipy.signal import lfilter
ar_params = [1, -0.6, -0.2]
ma_params = [1, 0.4]
simulated = lfilter(ma_params, ar_params, np.random.normal(0, 1, n))
 
# 网格搜索最优参数(基于 AIC)
def select_arima(series, max_p=3, max_d=1, max_q=3):
    """AIC 网格搜索最优 ARIMA 参数"""
    best_aic, best_order = np.inf, None
    results = []
    for p in range(max_p + 1):
        for d in range(max_d + 1):
            for q in range(max_q + 1):
                try:
                    model = ARIMA(series, order=(p, d, q))
                    fitted = model.fit()
                    results.append({'order': (p, d, q), 'aic': fitted.aic})
                    if fitted.aic < best_aic:
                        best_aic = fitted.aic
                        best_order = (p, d, q)
                except Exception:
                    continue
    results.sort(key=lambda x: x['aic'])
    print("Top 5 参数(按 AIC):")
    for r in results[:5]:
        print(f"  {r['order']} → AIC={r['aic']:.2f}")
    print(f"最优: {best_order}, AIC={best_aic:.2f}")
    return best_order
 
best_order = select_arima(simulated)
 
# 拟合并预测
model = ARIMA(simulated, order=best_order)
result = model.fit()
forecast = result.forecast(steps=20)
print(f"\n未来 5 期预测: {np.round(forecast[:5], 4)}")

3.3 SARIMA

对含季节性的数据使用 SARIMA(p,d,q)(P,D,Q,s)

# SARIMA 示例:季度数据
t = np.arange(400)
seasonal_data = 0.5 * np.sin(2 * np.pi * t / 4) + 0.001 * t + np.random.normal(0, 0.3, 400)
 
sarima = ARIMA(seasonal_data, order=(1,0,0), seasonal_order=(1,0,0,4))
sarima_result = sarima.fit()
print(f"SARIMA(1,0,0)(1,0,0,4) AIC: {sarima_result.aic:.2f}")

四、GARCH 族

4.1 波动率聚类

大幅波动后跟随大幅波动,小幅波动后跟随小幅波动——这就是波动率聚类

4.2 GARCH(1,1)

平稳条件:。波动率半衰期 =

参数金融含义
新信息(冲击)的反应速度
波动率的持续性
衰减速度,越接近 1 衰减越慢

4.3 GARCH 模拟与估计

import numpy as np
import matplotlib.pyplot as plt
 
np.random.seed(42)
 
def simulate_garch(omega, alpha, beta, n=2000):
    """模拟 GARCH(1,1) 过程"""
    assert alpha + beta < 1
    returns = np.zeros(n)
    vols = np.zeros(n)
    vols[0] = np.sqrt(omega / (1 - alpha - beta))
    for t in range(1, n):
        vols[t] = np.sqrt(omega + alpha * returns[t-1]**2 + beta * vols[t-1]**2)
        returns[t] = np.random.normal(0, vols[t])
    return returns, vols
 
# 模拟 GARCH(1,1)
omega, alpha, beta = 0.00001, 0.08, 0.90
returns, vols = simulate_garch(omega, alpha, beta)
 
# 绘图
fig, axes = plt.subplots(2, 1, figsize=(14, 7), sharex=True)
axes[0].plot(returns, linewidth=0.5, color='steelblue')
axes[0].set_title('GARCH(1,1) 模拟收益率')
axes[0].set_ylabel('收益率')
axes[1].plot(vols**2, linewidth=0.8, color='darkorange')
axes[1].set_title('条件方差(波动率聚类)')
axes[1].set_ylabel('方差')
axes[1].set_xlabel('时间')
plt.tight_layout()
plt.show()
 
print(f"参数: ω={omega}, α={alpha}, β={beta}")
print(f"无条件方差: {omega / (1-alpha-beta):.6f}")
print(f"波动率半衰期: {np.log(0.5)/np.log(alpha+beta):.1f} 天")
 
# 使用 arch 库估计
try:
    from arch import arch_model
    am = arch_model(returns * 100, vol='Garch', p=1, q=1)
    fit = am.fit(disp='off')
    print(f"\n估计结果:")
    print(f"  ω = {fit.params['omega']:.6f}")
    print(f"  α = {fit.params['alpha[1]']:.4f}")
    print(f"  β = {fit.params['beta[1]']:.4f}")
    print(f"  α+β = {fit.params['alpha[1]']+fit.params['beta[1]']:.4f}")
except ImportError:
    print("需安装 arch 库: pip install arch")

4.4 GARCH 变体

模型特点适用场景
GARCH(1,1)基准模型大多数场景
EGARCH对称性/杠杆效应负冲击产生更大波动
GJR-GARCH门限效应同上
GARCH-M波动率影响均值风险溢价建模

五、协整与配对交易

5.1 协整

两个非平稳序列 , 若存在线性组合 平稳,则称它们协整

金融含义:两个价格虽然各自随机游走,但存在长期均衡,偏离后会均值回复

5.2 Engle-Granger 两步法

from statsmodels.tsa.stattools import coint, adfuller
import numpy as np
 
np.random.seed(42)
n = 500
 
# 模拟协整序列: Y = 0.8*X + 平稳噪声
X = np.cumsum(np.random.normal(0.01, 0.5, n)) + 100
spread_noise = np.random.normal(0, 1, n)
Y = 0.8 * X + spread_noise
 
# 第一步:OLS 回归
X_aug = np.column_stack([np.ones(n), X])
beta_hat = np.linalg.lstsq(X_aug, Y, rcond=None)[0]
print(f"OLS: 截距={beta_hat[0]:.4f}, β={beta_hat[1]:.4f}")
 
# 第二步:对残差做 ADF 检验
residuals = Y - beta_hat[0] - beta_hat[1] * X
adf_p = adfuller(residuals, autolag='AIC')[1]
print(f"残差 ADF p 值: {adf_p:.6f}{'协整存在' if adf_p < 0.05 else '不显著'}")
 
# 直接使用 coint 函数
stat, p, crit = coint(X, Y)
print(f"直接 coint 检验 p 值: {p:.6f}")
 
# 非协整对照
X2 = np.cumsum(np.random.normal(0, 1, n))
Y2 = np.cumsum(np.random.normal(0, 1, n))
_, p_nc = coint(X2, Y2)
print(f"非协整序列 p 值: {p_nc:.4f} (应 > 0.05)")

5.3 Johansen 检验(多序列协整)

try:
    from statsmodels.tsa.vector_ar.vecm import coint_johansen
 
    n = 500
    common = np.cumsum(np.random.normal(0.01, 0.5, n))
    data = np.column_stack([
        common + np.random.normal(0, 1, n),
        1.5 * common + np.random.normal(0, 1, n),
        0.7 * common + np.random.normal(0, 1, n)
    ])
 
    joh = coint_johansen(data, det_order=0, k_ar_diff=2)
    print("Johansen 迹检验:")
    hyp = ['r=0 (无协整)', 'r<=1', 'r<=2']
    for i in range(3):
        stat_val = joh.lr1[i]
        crit_5 = joh.cvt[i, 1]
        rej = "拒绝" if stat_val > crit_5 else "不拒绝"
        print(f"  {hyp[i]:<15} 统计量={stat_val:.2f} 5%临界={crit_5:.2f}{rej}")
except ImportError:
    print("需更新 statsmodels")

5.4 配对交易信号

def pair_trading_signals(spread, entry_z=2.0, exit_z=0.5, window=60):
    """基于 z-score 的配对交易信号"""
    spread_mean = np.convolve(spread, np.ones(window)/window, mode='same')
    spread_std = np.array([np.std(spread[max(0,i-window):i]) + 1e-8 for i in range(len(spread))])
    z_score = (spread - spread_mean) / spread_std
 
    positions = np.zeros(len(spread))
    for i in range(1, len(spread)):
        if positions[i-1] == 0:
            if z_score[i] > entry_z:     positions[i] = -1  # 做空价差
            elif z_score[i] < -entry_z:  positions[i] = 1   # 做多价差
        else:
            positions[i] = 0 if abs(z_score[i]) < exit_z else positions[i-1]
    return positions
 
positions = pair_trading_signals(residuals)
trades = np.sum(np.diff(positions) != 0) // 2
print(f"\n配对交易统计: {trades} 笔, 持仓比 {np.mean(positions!=0):.1%}")

六、Granger 因果检验

如果用 的历史值预测 优于仅用 自身,则 Granger 因果

:

from statsmodels.tsa.stattools import grangercausalitytests
import numpy as np
 
np.random.seed(42)
n = 500
 
# X 领先 Y 2 期
X_lead = np.random.normal(0, 1, n)
Y_lag = np.zeros(n)
for t in range(2, n):
    Y_lag[t] = 0.5 * X_lead[t-2] + 0.3 * X_lead[t-1] + np.random.normal(0, 0.5)
 
data = np.column_stack([Y_lag, X_lead])
 
print("X → Y(应显著):")
grangercausalitytests(data, maxlag=5)
print("\nY → X(应不显著):")
grangercausalitytests(data[:, ::-1], maxlag=5)

七、Kalman 滤波器

7.1 状态空间模型

状态方程 观测方程

7.2 时变 Beta 估计

import numpy as np
 
class KalmanBeta:
    """Kalman 滤波估计时变 Beta"""
    def __init__(self, Q=1e-4, R=1e-4):
        self.Q = Q      # 过程噪声
        self.R = R      # 观测噪声
        self.x = 1.0    # Beta 初始估计
        self.P = 1.0    # 误差方差
 
    def filter(self, returns, market):
        n = len(returns)
        betas = np.zeros(n)
        for t in range(n):
            # 预测
            P_pred = self.P + self.Q
            # 更新
            H = market[t]
            K = P_pred * H / (H**2 * P_pred + self.R)
            self.x = self.x + K * (returns[t] - H * self.x)
            self.P = (1 - K * H) * P_pred
            betas[t] = self.x
        return betas
 
np.random.seed(42)
n = 500
market = np.random.normal(0.001, 0.02, n)
true_betas = np.ones(n)
true_betas[100:200] = 1.5; true_betas[200:350] = 0.8; true_betas[350:] = 1.2
returns = true_betas * market + np.random.normal(0, 0.01, n)
 
kf = KalmanBeta()
estimated = kf.filter(returns, market)
 
# 对比滚动 OLS
window = 60
rolling = np.zeros(n)
for t in range(window, n):
    x, y = market[t-window:t], returns[t-window:t]
    rolling[t] = np.dot(x, y) / np.dot(x, x)
 
print(f"Kalman Beta 标准差: {np.std(estimated):.4f}")
print(f"滚动 OLS 标准差:   {np.std(rolling[rolling!=0]):.4f}")
print(f"真实 Beta 标准差:  {np.std(true_betas):.4f}")

八、隐马尔可夫模型 (HMM)

8.1 模型

隐状态 S_t (不可观测):  牛市 / 震荡 / 熊市
    ↓ 发射分布
观测值 y_t (可观测):   高收益低波动 / 低波动 / 负收益高波动

8.2 手写 Gaussian HMM

import numpy as np
from scipy.stats import norm
 
class GaussianHMM:
    """高斯 HMM (Baum-Welch EM 算法)"""
    def __init__(self, K=2, max_iter=200, tol=1e-6):
        self.K = K
        self.max_iter = max_iter
        self.tol = tol
 
    def fit(self, obs):
        T = len(obs)
        # 初始化参数
        sorted_obs = np.sort(obs)
        split = T // self.K
        self.mu = np.array([np.mean(sorted_obs[i*split:(i+1)*split]) for i in range(self.K)])
        self.sigma = np.array([np.std(sorted_obs[i*split:(i+1)*split]) + 1e-6 for i in range(self.K)])
        self.A = np.full((self.K, self.K), 0.1)
        np.fill_diagonal(self.A, 0.9)
        self.pi = np.ones(self.K) / self.K
 
        prev_ll = -np.inf
        for it in range(self.max_iter):
            # 前向
            alpha = np.zeros((T, self.K))
            alpha[0] = self.pi * norm.pdf(obs[0], self.mu, self.sigma)
            sc = np.zeros(T)
            sc[0] = alpha[0].sum() + 1e-300
            alpha[0] /= sc[0]
            for t in range(1, T):
                for k in range(self.K):
                    alpha[t, k] = norm.pdf(obs[t], self.mu[k], self.sigma[k]) * \
                                  np.sum(alpha[t-1] * self.A[:, k])
                sc[t] = alpha[t].sum() + 1e-300
                alpha[t] /= sc[t]
 
            # 后向
            beta = np.zeros((T, self.K))
            beta[-1] = 1.0 / sc[-1]
            for t in range(T-2, -1, -1):
                for k in range(self.K):
                    beta[t, k] = np.sum(self.A[k] * norm.pdf(obs[t+1], self.mu, self.sigma) * beta[t+1])
                beta[t] /= sc[t+1]
 
            # gamma
            gamma = alpha * beta
            gamma /= gamma.sum(axis=1, keepdims=True) + 1e-300
 
            # M 步
            self.pi = gamma[0]
            for i in range(self.K):
                for j in range(self.K):
                    self.A[i, j] = np.sum(gamma[:-1, i] * norm.pdf(
                        obs[1:], self.mu[j], self.sigma[j]) * beta[1:, j]) / \
                        (np.sum(gamma[:-1, i] * alpha[:-1, i]) + 1e-300)
            for k in range(self.K):
                w = gamma[:, k]
                self.mu[k] = np.sum(w * obs) / (w.sum() + 1e-300)
                self.sigma[k] = np.sqrt(np.sum(w * (obs - self.mu[k])**2) / (w.sum() + 1e-300) + 1e-8)
 
            ll = np.sum(np.log(sc + 1e-300))
            if abs(ll - prev_ll) < self.tol:
                break
            prev_ll = ll
        return self
 
    def decode(self, obs):
        """Viterbi 最优状态序列"""
        T = len(obs)
        delta = np.zeros((T, self.K))
        psi = np.zeros((T, self.K), dtype=int)
        delta[0] = np.log(self.pi + 1e-300) + norm.logpdf(obs[0], self.mu, self.sigma)
        for t in range(1, T):
            for k in range(self.K):
                log_probs = delta[t-1] + np.log(self.A[:, k] + 1e-300)
                psi[t, k] = np.argmax(log_probs)
                delta[t, k] = log_probs[psi[t, k]] + norm.logpdf(obs[t], self.mu[k], self.sigma[k])
        states = np.zeros(T, dtype=int)
        states[-1] = np.argmax(delta[-1])
        for t in range(T-2, -1, -1):
            states[t] = psi[t+1, states[t+1]]
        return states
 
# 模拟三种市场状态
np.random.seed(42)
n = 1200
true_states = np.zeros(n, dtype=int)
true_states[300:600] = 1; true_states[600:900] = 2
params = {0: (0.001, 0.010), 1: (0.000, 0.008), 2: (-0.002, 0.025)}
obs = np.array([np.random.normal(*params[s]) for s in true_states])
 
print("训练 HMM (3 状态)...")
hmm = GaussianHMM(K=3)
hmm.fit(obs)
pred = hmm.decode(obs)
 
print(f"\n状态参数估计:")
for k in range(3):
    print(f"  状态 {k}: μ={hmm.mu[k]:.4f}, σ={hmm.sigma[k]:.4f}")
print(f"\n转移矩阵:\n{np.round(hmm.A, 3)}")

九、谱分析与 FFT

9.1 从时域到频域

将信号分解为不同频率的正弦波叠加,用 FFT(快速傅里叶变换)实现。

9.2 Python 实现

import numpy as np
import matplotlib.pyplot as plt
 
np.random.seed(42)
n = 1000
t = np.arange(n)
 
# 含周期成分的信号
signal = (2.0 * np.sin(2*np.pi*t/20) +    # 月度周期
          1.5 * np.sin(2*np.pi*t/50) +     # 季度周期
          1.0 * np.sin(2*np.pi*t/120) +    # 半年周期
          np.random.normal(0, 1.5, n))
 
# FFT
fft_vals = np.fft.fft(signal)
fft_freqs = np.fft.fftfreq(n)
power = (np.abs(fft_vals)**2 / n)[fft_freqs > 0]
freqs = fft_freqs[fft_freqs > 0]
periods = 1.0 / freqs
 
# 找主要周期
top_idx = np.argsort(power)[-5:][::-1]
print("功率最高的周期成分:")
for idx in top_idx:
    print(f"  周期 {periods[idx]:.1f} 天, 功率 {power[idx]:.2f}")
 
# 低通滤波去噪
def lowpass(signal, cutoff):
    """截断高频成分的去噪方法"""
    n = len(signal)
    fft_v = np.fft.fft(signal)
    freqs = np.fft.fftfreq(n)
    fft_v[np.abs(freqs) > cutoff] = 0
    return np.real(np.fft.ifft(fft_v))
 
filtered = lowpass(signal, cutoff=1.0/30)
print(f"\n原始波动率: {np.std(signal):.4f}")
print(f"滤波后:     {np.std(filtered):.4f}")
print(f"噪声:       {np.std(signal-filtered):.4f}")
 
fig, axes = plt.subplots(2, 1, figsize=(14, 7), sharex=True)
axes[0].plot(signal, linewidth=0.5, color='steelblue', label='原始')
axes[0].plot(filtered, linewidth=1, color='red', label='滤波')
axes[0].legend(); axes[0].set_title('信号去噪')
axes[1].bar(periods[periods < 200], power[periods < 200], width=1, color='darkorange')
for p in [20, 50, 120]:
    axes[1].axvline(p, color='red', ls='--', alpha=0.7)
axes[1].set_xlabel('周期(天)'); axes[1].set_title('功率谱')
plt.tight_layout()
plt.show()

总结

方法核心功能量化应用
ADF平稳性判断因子预处理、模型输入验证
ACF/PACF自相关识别模型阶数选择、残差诊断
ARIMA趋势和自回归收益率预测
GARCH波动率聚类风险管理、期权定价
协整长期均衡配对交易、统计套利
Granger 因果预测因果因子筛选、领先滞后
Kalman 滤波时变状态时变 Beta、动态因子
HMM隐状态识别市场状态切换
FFT频域分析周期检测、信号去噪

常见陷阱

  1. 对非平稳序列直接回归:产生伪回归,R平方虚高
  2. 忽略 GARCH 效应:只用 ARMA 会低估风险
  3. 协整关系可能失效:需在样本外验证稳定性
  4. Granger 因果 ≠ 真正因果:只是统计上的预测力
  5. ARIMA 过拟合:差分和滞后阶数不宜过高