时间序列分析
预计学习时间:3-4 小时
难度:⭐⭐⭐
核心问题:金融数据天然有序、时间依赖,如何用统计工具捕捉这种动态结构?
概述
时间序列分析是量化投资中不可或缺的数学工具。与截面数据不同,金融时间序列具有时序依赖——今天的收益率受昨天的影响,波动率存在聚集效应,多个价格序列之间可能存在长期均衡关系。
本节内容涵盖:
- 平稳性与检验:ADF 检验
- ACF/PACF:自相关函数与 Ljung-Box 检验
- ARIMA/SARIMA:经典自回归建模
- GARCH 族:波动率聚类建模
- 协整:Engle-Granger、Johansen、配对交易
- Granger 因果:领先滞后分析
- Kalman 滤波器:时变参数估计
- HMM:市场状态识别
- 谱分析与 FFT:频域分析
一、平稳性与 ADF 检验
1.1 平稳性定义
| 平稳性类型 | 定义 | 金融示例 |
|---|---|---|
| 弱平稳 | 均值、方差、自协方差恒定 | 对数收益率近似弱平稳 |
| 差分平稳 | 一阶差分后平稳 | 大多数股票价格序列 |
1.2 ADF 检验
原假设 :存在单位根(非平稳)。ADF 回归方程:
核心判断: 是否显著不为零。
import numpy as np
from statsmodels.tsa.stattools import adfuller
np.random.seed(42)
n = 500
# 生成三种序列
stationary = np.random.normal(0, 1, n) # 白噪声(平稳)
random_walk = np.cumsum(np.random.normal(0, 1, n)) # 随机游走(非平稳)
drift_walk = np.cumsum(np.random.normal(0.01, 1, n)) # 带漂移(非平稳)
def run_adf(series, name):
"""执行 ADF 检验并打印结果"""
result = adfuller(series, autolag='AIC')
print(f"\n{'='*50}")
print(f"ADF 检验: {name}")
print(f" ADF 统计量: {result[0]:.4f}")
print(f" p 值: {result[1]:.6f}")
conclusion = "平稳" if result[1] < 0.05 else "非平稳"
print(f" >>> 结论: 序列【{conclusion}】")
run_adf(stationary, "白噪声")
run_adf(random_walk, "随机游走")
run_adf(drift_walk, "带漂移随机游走")二、ACF / PACF 与 Ljung-Box 检验
2.1 ACF 与 PACF
ACF:,衡量序列与滞后值的线性相关。
PACF:在控制中间滞后影响后的纯相关性。ACF 是”总效应”,PACF 是”直接效应”。
2.2 用 ACF/PACF 识别 ARIMA 阶数
| 模型 | ACF 特征 | PACF 特征 |
|---|---|---|
| AR(p) | 拖尾(指数衰减) | p 阶后截尾 |
| MA(q) | q 阶后截尾 | 拖尾(指数衰减) |
| ARMA(p,q) | 拖尾 | 拖尾 |
2.3 Ljung-Box 检验
:前 阶自相关全部为零(纯随机序列)。
from statsmodels.tsa.stattools import acf, pacf
from statsmodels.stats.diagnostic import acorr_ljungbox
import matplotlib.pyplot as plt
np.random.seed(42)
# 生成 AR(2) 过程:y_t = 0.7*y_{t-1} + 0.2*y_{t-2} + noise
n = 500
noise = np.random.normal(0, 1, n)
ar_process = np.zeros(n)
for t in range(2, n):
ar_process[t] = 0.7 * ar_process[t-1] + 0.2 * ar_process[t-2] + noise[t]
# 计算 ACF 和 PACF
acf_vals, acf_ci = acf(ar_process, nlags=20, alpha=0.05)
pacf_vals, pacf_ci = pacf(ar_process, nlags=20, alpha=0.05)
# 绘图
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
lags = np.arange(21)
axes[0].bar(lags, acf_vals, color='steelblue', alpha=0.7)
axes[0].fill_between(lags, acf_ci[:, 0], acf_ci[:, 1], alpha=0.2, color='steelblue')
axes[0].axhline(y=0, color='black', linewidth=0.5)
axes[0].set_title('自相关函数 (ACF)')
axes[1].bar(lags, pacf_vals, color='darkorange', alpha=0.7)
axes[1].fill_between(lags, pacf_ci[:, 0], pacf_ci[:, 1], alpha=0.2, color='darkorange')
axes[1].axhline(y=0, color='black', linewidth=0.5)
axes[1].set_title('偏自相关函数 (PACF)')
plt.tight_layout()
plt.show()
# Ljung-Box 检验
lb_result = acorr_ljungbox(ar_process, lags=[5, 10, 15], return_df=True)
print("Ljung-Box 检验(AR(2)过程,应显著):")
print(lb_result)
lb_white = acorr_ljungbox(noise, lags=[5, 10, 15], return_df=True)
print("\nLjung-Box 检验(白噪声,应不显著):")
print(lb_white)三、ARIMA / SARIMA
3.1 ARIMA(p,d,q)
| 组件 | 名称 | 含义 |
|---|---|---|
| AR(p) | 自回归 | |
| I(d) | 差分 | 对序列做 d 阶差分 |
| MA(q) | 移动平均 |
3.2 参数选择与建模
from statsmodels.tsa.arima.model import ARIMA
import warnings
warnings.filterwarnings('ignore')
np.random.seed(42)
n = 1000
# 模拟 ARIMA(2,0,1) 数据
from scipy.signal import lfilter
ar_params = [1, -0.6, -0.2]
ma_params = [1, 0.4]
simulated = lfilter(ma_params, ar_params, np.random.normal(0, 1, n))
# 网格搜索最优参数(基于 AIC)
def select_arima(series, max_p=3, max_d=1, max_q=3):
"""AIC 网格搜索最优 ARIMA 参数"""
best_aic, best_order = np.inf, None
results = []
for p in range(max_p + 1):
for d in range(max_d + 1):
for q in range(max_q + 1):
try:
model = ARIMA(series, order=(p, d, q))
fitted = model.fit()
results.append({'order': (p, d, q), 'aic': fitted.aic})
if fitted.aic < best_aic:
best_aic = fitted.aic
best_order = (p, d, q)
except Exception:
continue
results.sort(key=lambda x: x['aic'])
print("Top 5 参数(按 AIC):")
for r in results[:5]:
print(f" {r['order']} → AIC={r['aic']:.2f}")
print(f"最优: {best_order}, AIC={best_aic:.2f}")
return best_order
best_order = select_arima(simulated)
# 拟合并预测
model = ARIMA(simulated, order=best_order)
result = model.fit()
forecast = result.forecast(steps=20)
print(f"\n未来 5 期预测: {np.round(forecast[:5], 4)}")3.3 SARIMA
对含季节性的数据使用 SARIMA(p,d,q)(P,D,Q,s):
# SARIMA 示例:季度数据
t = np.arange(400)
seasonal_data = 0.5 * np.sin(2 * np.pi * t / 4) + 0.001 * t + np.random.normal(0, 0.3, 400)
sarima = ARIMA(seasonal_data, order=(1,0,0), seasonal_order=(1,0,0,4))
sarima_result = sarima.fit()
print(f"SARIMA(1,0,0)(1,0,0,4) AIC: {sarima_result.aic:.2f}")四、GARCH 族
4.1 波动率聚类
大幅波动后跟随大幅波动,小幅波动后跟随小幅波动——这就是波动率聚类。
4.2 GARCH(1,1)
平稳条件:。波动率半衰期 = 。
| 参数 | 金融含义 |
|---|---|
| 对新信息(冲击)的反应速度 | |
| 波动率的持续性 | |
| 衰减速度,越接近 1 衰减越慢 |
4.3 GARCH 模拟与估计
import numpy as np
import matplotlib.pyplot as plt
np.random.seed(42)
def simulate_garch(omega, alpha, beta, n=2000):
"""模拟 GARCH(1,1) 过程"""
assert alpha + beta < 1
returns = np.zeros(n)
vols = np.zeros(n)
vols[0] = np.sqrt(omega / (1 - alpha - beta))
for t in range(1, n):
vols[t] = np.sqrt(omega + alpha * returns[t-1]**2 + beta * vols[t-1]**2)
returns[t] = np.random.normal(0, vols[t])
return returns, vols
# 模拟 GARCH(1,1)
omega, alpha, beta = 0.00001, 0.08, 0.90
returns, vols = simulate_garch(omega, alpha, beta)
# 绘图
fig, axes = plt.subplots(2, 1, figsize=(14, 7), sharex=True)
axes[0].plot(returns, linewidth=0.5, color='steelblue')
axes[0].set_title('GARCH(1,1) 模拟收益率')
axes[0].set_ylabel('收益率')
axes[1].plot(vols**2, linewidth=0.8, color='darkorange')
axes[1].set_title('条件方差(波动率聚类)')
axes[1].set_ylabel('方差')
axes[1].set_xlabel('时间')
plt.tight_layout()
plt.show()
print(f"参数: ω={omega}, α={alpha}, β={beta}")
print(f"无条件方差: {omega / (1-alpha-beta):.6f}")
print(f"波动率半衰期: {np.log(0.5)/np.log(alpha+beta):.1f} 天")
# 使用 arch 库估计
try:
from arch import arch_model
am = arch_model(returns * 100, vol='Garch', p=1, q=1)
fit = am.fit(disp='off')
print(f"\n估计结果:")
print(f" ω = {fit.params['omega']:.6f}")
print(f" α = {fit.params['alpha[1]']:.4f}")
print(f" β = {fit.params['beta[1]']:.4f}")
print(f" α+β = {fit.params['alpha[1]']+fit.params['beta[1]']:.4f}")
except ImportError:
print("需安装 arch 库: pip install arch")4.4 GARCH 变体
| 模型 | 特点 | 适用场景 |
|---|---|---|
| GARCH(1,1) | 基准模型 | 大多数场景 |
| EGARCH | 对称性/杠杆效应 | 负冲击产生更大波动 |
| GJR-GARCH | 门限效应 | 同上 |
| GARCH-M | 波动率影响均值 | 风险溢价建模 |
五、协整与配对交易
5.1 协整
两个非平稳序列 , 若存在线性组合 平稳,则称它们协整。
金融含义:两个价格虽然各自随机游走,但存在长期均衡,偏离后会均值回复。
5.2 Engle-Granger 两步法
from statsmodels.tsa.stattools import coint, adfuller
import numpy as np
np.random.seed(42)
n = 500
# 模拟协整序列: Y = 0.8*X + 平稳噪声
X = np.cumsum(np.random.normal(0.01, 0.5, n)) + 100
spread_noise = np.random.normal(0, 1, n)
Y = 0.8 * X + spread_noise
# 第一步:OLS 回归
X_aug = np.column_stack([np.ones(n), X])
beta_hat = np.linalg.lstsq(X_aug, Y, rcond=None)[0]
print(f"OLS: 截距={beta_hat[0]:.4f}, β={beta_hat[1]:.4f}")
# 第二步:对残差做 ADF 检验
residuals = Y - beta_hat[0] - beta_hat[1] * X
adf_p = adfuller(residuals, autolag='AIC')[1]
print(f"残差 ADF p 值: {adf_p:.6f} → {'协整存在' if adf_p < 0.05 else '不显著'}")
# 直接使用 coint 函数
stat, p, crit = coint(X, Y)
print(f"直接 coint 检验 p 值: {p:.6f}")
# 非协整对照
X2 = np.cumsum(np.random.normal(0, 1, n))
Y2 = np.cumsum(np.random.normal(0, 1, n))
_, p_nc = coint(X2, Y2)
print(f"非协整序列 p 值: {p_nc:.4f} (应 > 0.05)")5.3 Johansen 检验(多序列协整)
try:
from statsmodels.tsa.vector_ar.vecm import coint_johansen
n = 500
common = np.cumsum(np.random.normal(0.01, 0.5, n))
data = np.column_stack([
common + np.random.normal(0, 1, n),
1.5 * common + np.random.normal(0, 1, n),
0.7 * common + np.random.normal(0, 1, n)
])
joh = coint_johansen(data, det_order=0, k_ar_diff=2)
print("Johansen 迹检验:")
hyp = ['r=0 (无协整)', 'r<=1', 'r<=2']
for i in range(3):
stat_val = joh.lr1[i]
crit_5 = joh.cvt[i, 1]
rej = "拒绝" if stat_val > crit_5 else "不拒绝"
print(f" {hyp[i]:<15} 统计量={stat_val:.2f} 5%临界={crit_5:.2f} → {rej}")
except ImportError:
print("需更新 statsmodels")5.4 配对交易信号
def pair_trading_signals(spread, entry_z=2.0, exit_z=0.5, window=60):
"""基于 z-score 的配对交易信号"""
spread_mean = np.convolve(spread, np.ones(window)/window, mode='same')
spread_std = np.array([np.std(spread[max(0,i-window):i]) + 1e-8 for i in range(len(spread))])
z_score = (spread - spread_mean) / spread_std
positions = np.zeros(len(spread))
for i in range(1, len(spread)):
if positions[i-1] == 0:
if z_score[i] > entry_z: positions[i] = -1 # 做空价差
elif z_score[i] < -entry_z: positions[i] = 1 # 做多价差
else:
positions[i] = 0 if abs(z_score[i]) < exit_z else positions[i-1]
return positions
positions = pair_trading_signals(residuals)
trades = np.sum(np.diff(positions) != 0) // 2
print(f"\n配对交易统计: {trades} 笔, 持仓比 {np.mean(positions!=0):.1%}")六、Granger 因果检验
如果用 的历史值预测 优于仅用 自身,则 Granger 因果于 。
:
from statsmodels.tsa.stattools import grangercausalitytests
import numpy as np
np.random.seed(42)
n = 500
# X 领先 Y 2 期
X_lead = np.random.normal(0, 1, n)
Y_lag = np.zeros(n)
for t in range(2, n):
Y_lag[t] = 0.5 * X_lead[t-2] + 0.3 * X_lead[t-1] + np.random.normal(0, 0.5)
data = np.column_stack([Y_lag, X_lead])
print("X → Y(应显著):")
grangercausalitytests(data, maxlag=5)
print("\nY → X(应不显著):")
grangercausalitytests(data[:, ::-1], maxlag=5)七、Kalman 滤波器
7.1 状态空间模型
状态方程: 观测方程:
7.2 时变 Beta 估计
import numpy as np
class KalmanBeta:
"""Kalman 滤波估计时变 Beta"""
def __init__(self, Q=1e-4, R=1e-4):
self.Q = Q # 过程噪声
self.R = R # 观测噪声
self.x = 1.0 # Beta 初始估计
self.P = 1.0 # 误差方差
def filter(self, returns, market):
n = len(returns)
betas = np.zeros(n)
for t in range(n):
# 预测
P_pred = self.P + self.Q
# 更新
H = market[t]
K = P_pred * H / (H**2 * P_pred + self.R)
self.x = self.x + K * (returns[t] - H * self.x)
self.P = (1 - K * H) * P_pred
betas[t] = self.x
return betas
np.random.seed(42)
n = 500
market = np.random.normal(0.001, 0.02, n)
true_betas = np.ones(n)
true_betas[100:200] = 1.5; true_betas[200:350] = 0.8; true_betas[350:] = 1.2
returns = true_betas * market + np.random.normal(0, 0.01, n)
kf = KalmanBeta()
estimated = kf.filter(returns, market)
# 对比滚动 OLS
window = 60
rolling = np.zeros(n)
for t in range(window, n):
x, y = market[t-window:t], returns[t-window:t]
rolling[t] = np.dot(x, y) / np.dot(x, x)
print(f"Kalman Beta 标准差: {np.std(estimated):.4f}")
print(f"滚动 OLS 标准差: {np.std(rolling[rolling!=0]):.4f}")
print(f"真实 Beta 标准差: {np.std(true_betas):.4f}")八、隐马尔可夫模型 (HMM)
8.1 模型
隐状态 S_t (不可观测): 牛市 / 震荡 / 熊市
↓ 发射分布
观测值 y_t (可观测): 高收益低波动 / 低波动 / 负收益高波动
8.2 手写 Gaussian HMM
import numpy as np
from scipy.stats import norm
class GaussianHMM:
"""高斯 HMM (Baum-Welch EM 算法)"""
def __init__(self, K=2, max_iter=200, tol=1e-6):
self.K = K
self.max_iter = max_iter
self.tol = tol
def fit(self, obs):
T = len(obs)
# 初始化参数
sorted_obs = np.sort(obs)
split = T // self.K
self.mu = np.array([np.mean(sorted_obs[i*split:(i+1)*split]) for i in range(self.K)])
self.sigma = np.array([np.std(sorted_obs[i*split:(i+1)*split]) + 1e-6 for i in range(self.K)])
self.A = np.full((self.K, self.K), 0.1)
np.fill_diagonal(self.A, 0.9)
self.pi = np.ones(self.K) / self.K
prev_ll = -np.inf
for it in range(self.max_iter):
# 前向
alpha = np.zeros((T, self.K))
alpha[0] = self.pi * norm.pdf(obs[0], self.mu, self.sigma)
sc = np.zeros(T)
sc[0] = alpha[0].sum() + 1e-300
alpha[0] /= sc[0]
for t in range(1, T):
for k in range(self.K):
alpha[t, k] = norm.pdf(obs[t], self.mu[k], self.sigma[k]) * \
np.sum(alpha[t-1] * self.A[:, k])
sc[t] = alpha[t].sum() + 1e-300
alpha[t] /= sc[t]
# 后向
beta = np.zeros((T, self.K))
beta[-1] = 1.0 / sc[-1]
for t in range(T-2, -1, -1):
for k in range(self.K):
beta[t, k] = np.sum(self.A[k] * norm.pdf(obs[t+1], self.mu, self.sigma) * beta[t+1])
beta[t] /= sc[t+1]
# gamma
gamma = alpha * beta
gamma /= gamma.sum(axis=1, keepdims=True) + 1e-300
# M 步
self.pi = gamma[0]
for i in range(self.K):
for j in range(self.K):
self.A[i, j] = np.sum(gamma[:-1, i] * norm.pdf(
obs[1:], self.mu[j], self.sigma[j]) * beta[1:, j]) / \
(np.sum(gamma[:-1, i] * alpha[:-1, i]) + 1e-300)
for k in range(self.K):
w = gamma[:, k]
self.mu[k] = np.sum(w * obs) / (w.sum() + 1e-300)
self.sigma[k] = np.sqrt(np.sum(w * (obs - self.mu[k])**2) / (w.sum() + 1e-300) + 1e-8)
ll = np.sum(np.log(sc + 1e-300))
if abs(ll - prev_ll) < self.tol:
break
prev_ll = ll
return self
def decode(self, obs):
"""Viterbi 最优状态序列"""
T = len(obs)
delta = np.zeros((T, self.K))
psi = np.zeros((T, self.K), dtype=int)
delta[0] = np.log(self.pi + 1e-300) + norm.logpdf(obs[0], self.mu, self.sigma)
for t in range(1, T):
for k in range(self.K):
log_probs = delta[t-1] + np.log(self.A[:, k] + 1e-300)
psi[t, k] = np.argmax(log_probs)
delta[t, k] = log_probs[psi[t, k]] + norm.logpdf(obs[t], self.mu[k], self.sigma[k])
states = np.zeros(T, dtype=int)
states[-1] = np.argmax(delta[-1])
for t in range(T-2, -1, -1):
states[t] = psi[t+1, states[t+1]]
return states
# 模拟三种市场状态
np.random.seed(42)
n = 1200
true_states = np.zeros(n, dtype=int)
true_states[300:600] = 1; true_states[600:900] = 2
params = {0: (0.001, 0.010), 1: (0.000, 0.008), 2: (-0.002, 0.025)}
obs = np.array([np.random.normal(*params[s]) for s in true_states])
print("训练 HMM (3 状态)...")
hmm = GaussianHMM(K=3)
hmm.fit(obs)
pred = hmm.decode(obs)
print(f"\n状态参数估计:")
for k in range(3):
print(f" 状态 {k}: μ={hmm.mu[k]:.4f}, σ={hmm.sigma[k]:.4f}")
print(f"\n转移矩阵:\n{np.round(hmm.A, 3)}")九、谱分析与 FFT
9.1 从时域到频域
将信号分解为不同频率的正弦波叠加,用 FFT(快速傅里叶变换)实现。
9.2 Python 实现
import numpy as np
import matplotlib.pyplot as plt
np.random.seed(42)
n = 1000
t = np.arange(n)
# 含周期成分的信号
signal = (2.0 * np.sin(2*np.pi*t/20) + # 月度周期
1.5 * np.sin(2*np.pi*t/50) + # 季度周期
1.0 * np.sin(2*np.pi*t/120) + # 半年周期
np.random.normal(0, 1.5, n))
# FFT
fft_vals = np.fft.fft(signal)
fft_freqs = np.fft.fftfreq(n)
power = (np.abs(fft_vals)**2 / n)[fft_freqs > 0]
freqs = fft_freqs[fft_freqs > 0]
periods = 1.0 / freqs
# 找主要周期
top_idx = np.argsort(power)[-5:][::-1]
print("功率最高的周期成分:")
for idx in top_idx:
print(f" 周期 {periods[idx]:.1f} 天, 功率 {power[idx]:.2f}")
# 低通滤波去噪
def lowpass(signal, cutoff):
"""截断高频成分的去噪方法"""
n = len(signal)
fft_v = np.fft.fft(signal)
freqs = np.fft.fftfreq(n)
fft_v[np.abs(freqs) > cutoff] = 0
return np.real(np.fft.ifft(fft_v))
filtered = lowpass(signal, cutoff=1.0/30)
print(f"\n原始波动率: {np.std(signal):.4f}")
print(f"滤波后: {np.std(filtered):.4f}")
print(f"噪声: {np.std(signal-filtered):.4f}")
fig, axes = plt.subplots(2, 1, figsize=(14, 7), sharex=True)
axes[0].plot(signal, linewidth=0.5, color='steelblue', label='原始')
axes[0].plot(filtered, linewidth=1, color='red', label='滤波')
axes[0].legend(); axes[0].set_title('信号去噪')
axes[1].bar(periods[periods < 200], power[periods < 200], width=1, color='darkorange')
for p in [20, 50, 120]:
axes[1].axvline(p, color='red', ls='--', alpha=0.7)
axes[1].set_xlabel('周期(天)'); axes[1].set_title('功率谱')
plt.tight_layout()
plt.show()总结
| 方法 | 核心功能 | 量化应用 |
|---|---|---|
| ADF | 平稳性判断 | 因子预处理、模型输入验证 |
| ACF/PACF | 自相关识别 | 模型阶数选择、残差诊断 |
| ARIMA | 趋势和自回归 | 收益率预测 |
| GARCH | 波动率聚类 | 风险管理、期权定价 |
| 协整 | 长期均衡 | 配对交易、统计套利 |
| Granger 因果 | 预测因果 | 因子筛选、领先滞后 |
| Kalman 滤波 | 时变状态 | 时变 Beta、动态因子 |
| HMM | 隐状态识别 | 市场状态切换 |
| FFT | 频域分析 | 周期检测、信号去噪 |
常见陷阱
- 对非平稳序列直接回归:产生伪回归,R平方虚高
- 忽略 GARCH 效应:只用 ARMA 会低估风险
- 协整关系可能失效:需在样本外验证稳定性
- Granger 因果 ≠ 真正因果:只是统计上的预测力
- ARIMA 过拟合:差分和滞后阶数不宜过高