06-实战案例
本节通过一个完整的端到端项目,展示如何用深度学习(LSTM+Attention)进行量化时序预测,包括数据处理、模型构建、训练、评估和回测。
项目概述
本项目实现一个完整的量化深度学习预测系统:
项目流程:
1. 数据生成 ─→ 2. 预处理 ─→ 3. Dataset构建 ─→ 4. 模型定义 ─→ 5. 训练 ─→ 6. 评估 ─→ 7. 回测
│ │ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼ ▼
模拟数据 标准化 DataLoader LSTM+Att 早停调度 IC/RankIC 分层回测
1. 数据生成
import numpy as np
import pandas as pd
import scipy.stats
def generate_quant_data(
n_stocks=300,
n_days=500, # 约2年交易日
n_features=20,
noise_level=0.01,
trend_strength=0.0002,
momentum_signal_strength=0.3,
mean_reversion_signal_strength=0.2,
random_seed=42
):
"""
生成模拟量化数据
模拟真实股票特征:
- 价格趋势(随机游走)
- 动量效应
- 均值回归
- 波动率聚类
- 横截面相关性
Args:
n_stocks: 股票数量
n_days: 交易日数量
n_features: 特征数量
noise_level: 噪声水平
trend_strength: 趋势强度
momentum_signal_strength: 动量信号强度
mean_reversion_signal_strength: 均值回归信号强度
random_seed: 随机种子
Returns:
prices: (n_days, n_stocks) 价格数据
features: (n_days, n_stocks, n_features) 特征数据
returns: (n_days, n_stocks) 收益率数据
"""
np.random.seed(random_seed)
# 1. 生成价格(几何布朗运动)
dt = 1 / 252 # 日频
drift = trend_strength * dt
diffusion = noise_level * np.sqrt(dt)
# 真实收益率(带动量和均值回归信号)
true_returns = np.zeros((n_days, n_stocks))
for t in range(1, n_days):
# 动量信号:过去收益的延续
momentum_signal = np.mean(true_returns[max(0, t-20):t], axis=0) * momentum_signal_strength
# 均值回归信号:价格偏离均值的回归
if t >= 60:
price_mean = np.mean(true_returns[t-60:t], axis=0)
mean_reversion_signal = -price_mean * mean_reversion_signal_strength
else:
mean_reversion_signal = 0
# 横截面因素(市场因子)
market_factor = np.random.normal(0, 1) * 0.5
# 生成收益率
stock_specific = np.random.normal(0, 1, n_stocks)
true_returns[t] = (
drift +
momentum_signal +
mean_reversion_signal +
market_factor * 0.3 +
stock_specific * diffusion
)
# 2. 生成价格
prices = np.cumprod(1 + true_returns, axis=0)
prices = prices / prices[0] * 100 # 初始价格归一化为100
# 3. 生成特征
features = np.zeros((n_days, n_stocks, n_features))
# 技术指标特征
for t in range(60, n_days):
past_returns = true_returns[t-60:t]
# 动量类特征
features[t, :, 0] = np.mean(past_returns[-5:], axis=0) # 5日动量
features[t, :, 1] = np.mean(past_returns[-10:], axis=0) # 10日动量
features[t, :, 2] = np.mean(past_returns[-20:], axis=0) # 20日动量
# 波动率特征
features[t, :, 3] = np.std(past_returns[-5:], axis=0) # 5日波动率
features[t, :, 4] = np.std(past_returns[-20:], axis=0) # 20日波动率
# 价格位置特征
past_prices = prices[t-60:t]
features[t, :, 5] = (prices[t] - np.mean(past_prices, axis=0)) / np.std(past_prices, axis=0) # Z-score价格位置
# RSI
gains = np.where(past_returns[-14:] > 0, past_returns[-14:], 0)
losses = np.where(past_returns[-14:] < 0, -past_returns[-14:], 0)
avg_gain = np.mean(gains, axis=0)
avg_loss = np.mean(losses, axis=0)
rs = avg_gain / np.where(avg_loss == 0, 1e-6, avg_loss)
features[t, :, 6] = 100 - 100 / (1 + rs) # RSI
# 成交量特征(模拟)
features[t, :, 7] = np.abs(true_returns[t]) # 用收益率绝对值模拟成交量变化
# 横截面排名特征
features[t, :, 8] = scipy.stats.rankdata(features[t, :, 0]) / n_stocks # 动量排名
features[t, :, 9] = scipy.stats.rankdata(features[t, :, 3]) / n_stocks # 波动率排名
# 填充前60天(用均值)
for t in range(60):
features[t] = np.mean(features[60:65], axis=0)
# 剩余特征用噪声填充
for i in range(10, n_features):
features[:, :, i] = np.random.randn(n_days, n_stocks) * 0.1
return prices, features, true_returns
# 生成数据
import scipy.stats
prices, features, returns = generate_quant_data(
n_stocks=300,
n_days=500,
n_features=20,
random_seed=42
)
print(f"数据形状:")
print(f" 价格: {prices.shape}")
print(f" 特征: {features.shape}")
print(f" 收益率: {returns.shape}")
# 可视化部分股票价格
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 4))
plt.plot(prices[:, :10])
plt.title('模拟股票价格(前10只)')
plt.xlabel('交易日')
plt.ylabel('价格(归一化)')
plt.grid(True)
plt.show()2. 数据预处理
def preprocess_data(features, returns, train_ratio=0.7, val_ratio=0.15):
"""
数据预处理
1. 横截面标准化
2. 时序划分
3. 填充缺失值
Args:
features: (n_days, n_stocks, n_features)
returns: (n_days, n_stocks)
train_ratio: 训练集比例
val_ratio: 验证集比例
Returns:
train_features, val_features, test_features
train_returns, val_returns, test_returns
"""
n_days = features.shape[0]
train_end = int(n_days * train_ratio)
val_end = int(n_days * (train_ratio + val_ratio))
# 1. 横截面标准化(每期每特征独立标准化)
features_norm = np.zeros_like(features)
for t in range(n_days):
for f in range(features.shape[2]):
截面 = features[t, :, f].copy()
mean = np.mean(截面)
std = np.std(截面)
if std > 0:
features_norm[t, :, f] = (截面 - mean) / std
else:
features_norm[t, :, f] = 0
# 2. 时序划分
train_features = features_norm[:train_end]
val_features = features_norm[train_end:val_end]
test_features = features_norm[val_end:]
train_returns = returns[:train_end]
val_returns = returns[train_end:val_end]
test_returns = returns[val_end:]
print("数据划分:")
print(f" 训练集: {train_end} 天 ({train_ratio*100:.0f}%)")
print(f" 验证集: {val_end - train_end} 天 ({val_ratio*100:.0f}%)")
print(f" 测试集: {n_days - val_end} 天 ({(1-train_ratio-val_ratio)*100:.0f}%)")
return (
(train_features, val_features, test_features),
(train_returns, val_returns, test_returns)
)
# 预处理数据
(train_feat, val_feat, test_feat), (train_ret, val_ret, test_ret) = preprocess_data(
features, returns, train_ratio=0.7, val_ratio=0.15
)
print(f"\n标准化后特征统计(训练集前1天):")
print(f" 均值: {np.mean(train_feat[0], axis=0)[:5]}") # 前5个特征
print(f" 标准差: {np.std(train_feat[0], axis=0)[:5]}")3. Dataset 和 DataLoader
import torch
from torch.utils.data import Dataset, DataLoader
class QuantDataset(Dataset):
"""量化时序数据集"""
def __init__(self, features, returns, seq_len=20, horizon=1):
"""
Args:
features: (n_days, n_stocks, n_features)
returns: (n_days, n_stocks)
seq_len: 序列长度
horizon: 预测步数
"""
self.features = torch.tensor(features, dtype=torch.float32)
self.returns = torch.tensor(returns, dtype=torch.float32)
self.seq_len = seq_len
self.horizon = horizon
self.n_days, self.n_stocks, self.n_features = features.shape
def __len__(self):
return self.n_days - self.seq_len - self.horizon + 1
def __getitem__(self, idx):
# 特征序列
x = self.features[idx:idx + self.seq_len] # (seq_len, n_stocks, n_features)
# 标签:下一期收益率
y = self.returns[idx + self.seq_len] # (n_stocks,)
return x, y
# 创建数据集
seq_len = 20
horizon = 1
train_dataset = QuantDataset(train_feat, train_ret, seq_len, horizon)
val_dataset = QuantDataset(val_feat, val_ret, seq_len, horizon)
test_dataset = QuantDataset(test_feat, test_ret, seq_len, horizon)
# 创建 DataLoader
batch_size = 32
train_loader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=False, # 时序数据不打乱
num_workers=0,
drop_last=True
)
val_loader = DataLoader(
val_dataset,
batch_size=batch_size,
shuffle=False,
num_workers=0
)
test_loader = DataLoader(
test_dataset,
batch_size=batch_size,
shuffle=False,
num_workers=0
)
print(f"数据集大小:")
print(f" 训练集: {len(train_dataset)} 样本, {len(train_loader)} 批次")
print(f" 验证集: {len(val_dataset)} 样本, {len(val_loader)} 批次")
print(f" 测试集: {len(test_dataset)} 样本, {len(test_loader)} 批次")
# 检查一个批次
for x, y in train_loader:
print(f"\n批次形状:")
print(f" X: {x.shape} # (batch, seq_len, n_stocks, n_features)")
print(f" y: {y.shape} # (batch, n_stocks)")
break4. 模型定义(LSTM + Attention)
import torch
import torch.nn as nn
import torch.nn.functional as F
class BahdanauAttention(nn.Module):
"""Bahdanau Attention"""
def __init__(self, hidden_size):
super().__init__()
self.hidden_size = hidden_size
self.attn = nn.Linear(hidden_size * 2, hidden_size)
self.v = nn.Linear(hidden_size, 1, bias=False)
def forward(self, hidden, encoder_outputs):
"""
Args:
hidden: (batch, n_stocks, hidden_size)
encoder_outputs: (batch, seq_len, n_stocks, hidden_size)
Returns:
context: (batch, n_stocks, hidden_size)
attn_weights: (batch, n_stocks, seq_len)
"""
batch, seq_len, n_stocks, hidden_size = encoder_outputs.size()
# 重塑以便计算注意力
hidden = hidden.unsqueeze(1).repeat(1, seq_len, 1, 1) # (batch, seq_len, n_stocks, hidden_size)
# 计算注意力能量
combined = torch.cat([hidden, encoder_outputs], dim=-1) # (batch, seq_len, n_stocks, 2*hidden)
energy = torch.tanh(self.attn(combined)) # (batch, seq_len, n_stocks, hidden)
attention = self.v(energy).squeeze(-1) # (batch, seq_len, n_stocks)
# softmax over seq_len
attn_weights = F.softmax(attention, dim=1) # (batch, seq_len, n_stocks)
# 加权求和
# encoder_outputs: (batch, seq_len, n_stocks, hidden)
# attn_weights: (batch, seq_len, n_stocks)
context = torch.einsum('bsnh,bsn->bsh', encoder_outputs, attn_weights)
return context, attn_weights.transpose(1, 2) # (batch, n_stocks, seq_len)
class LSTMAttentionModel(nn.Module):
"""LSTM + Attention 量化预测模型"""
def __init__(
self,
n_features,
n_stocks,
hidden_size=64,
num_layers=2,
dropout=0.2
):
super().__init__()
self.n_features = n_features
self.n_stocks = n_stocks
self.hidden_size = hidden_size
# 输入投影:将每个股票的特征映射到 LSTM 输入
# (n_stocks, n_features) → (n_stocks, hidden_size)
self.input_projection = nn.Linear(n_features, hidden_size)
# LSTM:处理时间维度
self.lstm = nn.LSTM(
input_size=hidden_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0
)
# Attention
self.attention = BahdanauAttention(hidden_size)
# 输出层
self.output_layers = nn.ModuleList([
nn.Linear(hidden_size * 2, hidden_size),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_size, 1)
])
def forward(self, x):
"""
Args:
x: (batch, seq_len, n_stocks, n_features)
Returns:
predictions: (batch, n_stocks)
attn_weights: (batch, n_stocks, seq_len)
"""
batch, seq_len, n_stocks, n_features = x.size()
# 重塑:合并 batch 和 stocks 维度
x = x.permute(0, 2, 1, 3) # (batch, n_stocks, seq_len, n_features)
x = x.reshape(batch * n_stocks, seq_len, n_features)
# 输入投影
x = self.input_projection(x) # (batch*n_stocks, seq_len, hidden_size)
# LSTM
lstm_out, (h_n, c_n) = self.lstm(x) # lstm_out: (batch*n_stocks, seq_len, hidden_size)
# 重塑回 batch 和 stocks
lstm_out = lstm_out.reshape(batch, n_stocks, seq_len, self.hidden_size)
lstm_out = lstm_out.permute(0, 2, 1, 3) # (batch, seq_len, n_stocks, hidden_size)
# 取最后时间步的隐藏状态
last_hidden = lstm_out[:, -1] # (batch, n_stocks, hidden_size)
# Attention
context, attn_weights = self.attention(last_hidden, lstm_out)
# 拼接
combined = torch.cat([last_hidden, context], dim=-1) # (batch, n_stocks, hidden_size*2)
# 输出层
for layer in self.output_layers:
combined = layer(combined)
predictions = combined.squeeze(-1) # (batch, n_stocks)
return predictions, attn_weights
# 创建模型
model = LSTMAttentionModel(
n_features=20,
n_stocks=300,
hidden_size=64,
num_layers=2,
dropout=0.2
)
# 测试前向传播
x, y = next(iter(train_loader))
with torch.no_grad():
predictions, attn_weights = model(x)
print(f"模型输出:")
print(f" 预测: {predictions.shape}") # (batch, n_stocks)
print(f" 注意力权重: {attn_weights.shape}") # (batch, n_stocks, seq_len)
# 计算参数量
total_params = sum(p.numel() for p in model.parameters())
print(f"\n模型参数量: {total_params:,}")5. 训练
import torch.optim as optim
from torch.cuda.amp import autocast, GradScaler
# 设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
# 损失函数和优化器
criterion = nn.MSELoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)
# 学习率调度器
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', factor=0.5, patience=5, min_lr=1e-6
)
# 早停
class EarlyStopping:
def __init__(self, patience=10, min_delta=0):
self.patience = patience
self.min_delta = min_delta
self.counter = 0
self.best_loss = None
self.early_stop = False
self.best_model_state = None
def __call__(self, val_loss, model):
score = -val_loss
if self.best_score is None:
self.best_score = score
self.save_checkpoint(model)
elif score < self.best_score + self.min_delta:
self.counter += 1
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_score = score
self.save_checkpoint(model)
self.counter = 0
def save_checkpoint(self, model):
self.best_model_state = model.state_dict().copy()
early_stopping = EarlyStopping(patience=10)
# 训练循环
def train_epoch(model, dataloader, criterion, optimizer, device):
model.train()
total_loss = 0
for x, y in dataloader:
x, y = x.to(device), y.to(device)
# 前向传播
predictions, _ = model(x)
loss = criterion(predictions, y)
# 反向传播
optimizer.zero_grad()
loss.backward()
# 梯度裁剪
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
total_loss += loss.item()
return total_loss / len(dataloader)
def validate(model, dataloader, criterion, device):
model.eval()
total_loss = 0
with torch.no_grad():
for x, y in dataloader:
x, y = x.to(device), y.to(device)
predictions, _ = model(x)
loss = criterion(predictions, y)
total_loss += loss.item()
return total_loss / len(dataloader)
# 训练
epochs = 50
train_losses = []
val_losses = []
print("开始训练...")
for epoch in range(epochs):
train_loss = train_epoch(model, train_loader, criterion, optimizer, device)
val_loss = validate(model, val_loader, criterion, device)
train_losses.append(train_loss)
val_losses.append(val_loss)
scheduler.step(val_loss)
print(f"Epoch {epoch+1}/{epochs} - Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}")
early_stopping(val_loss, model)
if early_stopping.early_stop:
print(f"Early stopping at epoch {epoch+1}")
break
# 加载最佳模型
model.load_state_dict(early_stopping.best_model_state)
# 绘制损失曲线
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 4))
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.grid(True)
plt.show()6. 评估
def evaluate_model(model, dataloader, device):
"""评估模型,返回预测值和真实值"""
model.eval()
all_predictions = []
all_targets = []
with torch.no_grad():
for x, y in dataloader:
x, y = x.to(device), y.to(device)
predictions, _ = model(x)
all_predictions.append(predictions.cpu())
all_targets.append(y.cpu())
predictions = torch.cat(all_predictions, dim=0).numpy()
targets = torch.cat(all_targets, dim=0).numpy()
return predictions, targets
# 计算评估指标
def calculate_metrics(predictions, targets):
"""计算评估指标"""
from scipy.stats import pearsonr, spearmanr
# Flatten
pred_flat = predictions.flatten()
true_flat = targets.flatten()
# MSE, MAE
mse = np.mean((pred_flat - true_flat) ** 2)
mae = np.mean(np.abs(pred_flat - true_flat))
# IC (Pearson correlation)
ic, ic_pvalue = pearsonr(pred_flat, true_flat)
# Rank IC (Spearman correlation)
rank_ic, rank_ic_pvalue = spearmanr(pred_flat, true_flat)
return {
'MSE': mse,
'MAE': mae,
'IC': ic,
'IC_pvalue': ic_pvalue,
'Rank IC': rank_ic,
'Rank IC_pvalue': rank_ic_pvalue
}
# 评估
train_pred, train_true = evaluate_model(model, train_loader, device)
val_pred, val_true = evaluate_model(model, val_loader, device)
test_pred, test_true = evaluate_model(model, test_loader, device)
# 计算指标
train_metrics = calculate_metrics(train_pred, train_true)
val_metrics = calculate_metrics(val_pred, val_true)
test_metrics = calculate_metrics(test_pred, test_true)
# 打印结果
print("\n评估指标:")
print(f"{'指标':<15} {'训练集':>12} {'验证集':>12} {'测试集':>12}")
print("-" * 55)
for metric in ['MSE', 'MAE', 'IC', 'Rank IC']:
print(f"{metric:<15} {train_metrics[metric]:>12.6f} {val_metrics[metric]:>12.6f} {test_metrics[metric]:>12.6f}")7. 分层回测
def backtest(predictions, targets, n_quantiles=5):
"""
分层回测
根据预测值将股票分为n组,计算每组收益
Args:
predictions: (n_samples, n_stocks)
targets: (n_samples, n_stocks)
n_quantiles: 分层数
Returns:
portfolio_returns: 每层收益 (n_samples, n_quantiles)
"""
n_samples = predictions.shape[0]
portfolio_returns = np.zeros((n_samples, n_quantiles))
for t in range(n_samples):
pred = predictions[t]
ret = targets[t]
# 分层
quantiles = pd.qcut(pred, n_quantiles, labels=False, duplicates='drop')
# 计算每层平均收益
for q in range(n_quantiles):
mask = quantiles == q
if mask.sum() > 0:
portfolio_returns[t, q] = ret[mask].mean()
return portfolio_returns
# 回测
test_portfolio_returns = backtest(test_pred, test_true, n_quantiles=5)
# 计算累计收益
cumulative_returns = np.cumprod(1 + test_portfolio_returns, axis=0) - 1
# 绘制分层收益
plt.figure(figsize=(12, 5))
# 累计收益曲线
plt.subplot(1, 2, 1)
for q in range(5):
plt.plot(cumulative_returns[:, q] * 100, label=f'Q{q+1}')
plt.xlabel('Days')
plt.ylabel('Cumulative Returns (%)')
plt.title('分层累计收益')
plt.legend()
plt.grid(True)
# 平均收益
plt.subplot(1, 2, 2)
avg_returns = test_portfolio_returns.mean(axis=0) * 10000 # 转换为基点
plt.bar(range(5), avg_returns)
plt.xlabel('Quantile')
plt.ylabel('Avg Daily Return (bps)')
plt.title('各层平均日收益')
plt.grid(True)
plt.tight_layout()
plt.show()
# 计算多空收益
long_short_return = test_portfolio_returns[:, -1] - test_portfolio_returns[:, 0]
long_short_cumulative = np.cumprod(1 + long_short_return) - 1
print(f"\n多空策略 (Q5 - Q1):")
print(f" 累计收益: {long_short_cumulative[-1]*100:.2f}%")
print(f" 年化收益: {(1+long_short_cumulative[-1])**(252/len(long_short_return))-1:.2%}")
print(f" 夏普比率: {long_short_return.mean()/long_short_return.std()*np.sqrt(252):.2f}")
print(f" 胜率: {(long_short_return > 0).mean():.2%}")8. 滚动训练
def rolling_train(
model_class,
data_dict,
window_size=350,
retrain_freq=50, # 每50天重训练一次
**model_kwargs
):
"""
滚动训练
模拟实盘:定期用新数据重训练模型
"""
all_features, all_returns = data_dict['features'], data_dict['returns']
n_days = len(all_features)
predictions_list = []
# 起始位置
start = window_size
while start + retrain_freq < n_days:
end = min(start + retrain_freq, n_days)
print(f"\n训练区间: [{max(0, start-window_size)}, {start}), 预测区间: [{start}, {end})")
# 准备训练数据
train_feat = all_features[max(0, start-window_size):start]
train_ret = all_returns[max(0, start-window_size):start]
# 横截面标准化
for t in range(len(train_feat)):
for f in range(train_feat.shape[2]):
截面 = train_feat[t, :, f]
mean = np.mean(截面)
std = np.std(截面)
if std > 0:
train_feat[t, :, f] = (截面 - mean) / std
# 创建数据集
train_dataset = QuantDataset(train_feat, train_ret, seq_len=20, horizon=1)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=False)
# 创建模型
model = model_class(**model_kwargs).to(device)
criterion = nn.MSELoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)
# 训练
for epoch in range(10): # 滚动训练用较少epoch
train_loss = train_epoch(model, train_loader, criterion, optimizer, device)
if (epoch + 1) % 5 == 0:
print(f" Epoch {epoch+1}/10, Loss: {train_loss:.6f}")
# 预测
model.eval()
with torch.no_grad():
for t in range(start, end):
if t >= 20:
# 准备输入
x = all_features[t-20:t].copy()
for i in range(20):
for f in range(x.shape[2]):
截面 = x[i, :, f]
mean = np.mean(截面)
std = np.std(截面)
if std > 0:
x[i, :, f] = (截面 - mean) / std
x_tensor = torch.tensor(x, dtype=torch.float32).unsqueeze(0).to(device)
pred, _ = model(x_tensor)
predictions_list.append(pred.squeeze(0).cpu().numpy())
start = end
return np.array(predictions_list)
# 执行滚动训练(简化版,只演示概念)
# rolling_predictions = rolling_train(
# LSTMAttentionModel,
# data_dict={'features': features, 'returns': returns},
# window_size=350,
# retrain_freq=50,
# n_features=20,
# n_stocks=300,
# hidden_size=64,
# num_layers=2,
# dropout=0.2
# )9. 树模型 vs 深度学习对比
# 使用 LightGBM 作为对比
import lightgbm as lgb
from sklearn.metrics import mean_squared_error
# 准备 LightGBM 数据
def prepare_lgb_data(features, returns, seq_len=20):
"""为 LightGBM 准备数据"""
X_list = []
y_list = []
for t in range(seq_len, len(features)):
# 取过去 seq_len 天的特征
historical_features = features[t-seq_len:t]
# 展平: (n_stocks, seq_len * n_features)
X_flat = historical_features.transpose(1, 0, 2).reshape(features.shape[1], -1)
X_list.append(X_flat)
# 标签
y_list.append(returns[t])
X = np.vstack(X_list)
y = np.hstack(y_list)
return X, y
# 准备数据
X_train, y_train = prepare_lgb_data(train_feat, train_ret, seq_len=20)
X_val, y_val = prepare_lgb_data(val_feat, val_ret, seq_len=20)
X_test, y_test = prepare_lgb_data(test_feat, test_ret, seq_len=20)
print(f"LightGBM 数据形状:")
print(f" 训练: {X_train.shape}")
print(f" 测试: {X_test.shape}")
# 训练 LightGBM
lgb_train = lgb.Dataset(X_train, y_train)
lgb_val = lgb.Dataset(X_val, y_val, reference=lgb_train)
params = {
'objective': 'regression',
'metric': 'mse',
'num_leaves': 31,
'learning_rate': 0.05,
'feature_fraction': 0.8,
'bagging_fraction': 0.8,
'bagging_freq': 5,
'verbose': -1
}
gbm = lgb.train(
params,
lgb_train,
num_boost_round=500,
valid_sets=[lgb_train, lgb_val],
callbacks=[lgb.early_stopping(10), lgb.log_evaluation(50)]
)
# 预测
lgb_pred = gbm.predict(X_test)
# 重塑为 (n_samples, n_stocks)
n_test_samples = len(test_dataset)
lgb_pred_reshaped = lgb_pred.reshape(n_test_samples, -1)
# 计算 IC
lgb_ic, _ = pearsonr(lgb_pred_reshaped.flatten(), test_true.flatten())
lgb_rank_ic, _ = spearmanr(lgb_pred_reshaped.flatten(), test_true.flatten())
print(f"\n模型对比 (测试集):")
print(f"{'指标':<15} {'LSTM+Attention':>15} {'LightGBM':>15}")
print("-" * 50)
print(f"{'IC':<15} {test_metrics['IC']:>15.6f} {lgb_ic:>15.6f}")
print(f"{'Rank IC':<15} {test_metrics['Rank IC']:>15.6f} {lgb_rank_ic:>15.6f}")10. 可视化注意力权重
# 获取注意力权重
model.eval()
with torch.no_grad():
x, y = next(iter(test_loader))
x, y = x.to(device), y.to(device)
predictions, attn_weights = model(x)
# 可视化第一个样本、第一只股票的注意力权重
sample_idx = 0
stock_idx = 0
attn = attn_weights[sample_idx, stock_idx, :].cpu().numpy()
plt.figure(figsize=(10, 3))
plt.bar(range(20), attn)
plt.xlabel('Time Step (t-20 to t-1)')
plt.ylabel('Attention Weight')
plt.title(f'Attention Weights - Sample {sample_idx}, Stock {stock_idx}')
plt.grid(True)
plt.show()
print(f"最关注的时间步: {np.argmax(attn)} (t-{20-np.argmax(attn)})")核心知识点总结
1. 完整项目流程
数据生成 → 预处理 → Dataset → 模型 → 训练 → 评估 → 回测
2. 量化数据处理
# 横截面标准化(每期每特征独立)
for t in range(n_days):
for f in range(n_features):
features[t, :, f] = (features[t, :, f] - mean) / std3. LSTM + Attention
# 结合 LSTM 序列建模和 Attention 关注重点
lstm_out, _ = self.lstm(x)
context, attn_weights = self.attention(last_hidden, lstm_out)4. 评估指标
# IC: Pearson 相关系数
# Rank IC: Spearman 等级相关系数
# 分层回测: 按预测值分组,计算各组收益结果解读和优化建议
结果解读
- IC > 0.03:模型有预测能力
- Rank IC > 0.05:排序能力强
- 分层单调性:Q1 < Q2 < Q3 < Q4 < Q5 说明模型有效
- 多空收益:Q5 - Q1 的年化收益和夏普比率
优化建议
| 问题 | 可能原因 | 解决方案 |
|---|---|---|
| IC 接近 0 | 特征不足、模型过拟合 | 增加特征、增强正则化 |
| 过拟合 | 模型太复杂、数据太少 | 减少 hidden_size、增加 dropout |
| 训练不稳定 | 学习率过大 | 降低学习率、使用 warmup |
| 泛化差 | 分布漂移大 | 滚动训练、使用更多历史数据 |
| 分层不单调 | 模型排序能力弱 | 使用 Rank IC Loss |
进一步优化方向
- 特征工程:添加更多技术指标、基本面因子
- 模型架构:尝试 Transformer、TCN 等架构
- 损失函数:使用 IC Loss、组合损失
- 训练策略:对抗训练、集成学习
- 风险控制:加入波动率预测、风险模型
项目代码总结
本项目展示了一个完整的量化深度学习流程,核心代码模块:
- 数据生成:
generate_quant_data() - 预处理:
preprocess_data() - 数据集:
QuantDataset - 模型:
LSTMAttentionModel - 训练:
train_epoch(),validate() - 评估:
calculate_metrics() - 回测:
backtest()
这些代码可以直接应用到实际量化项目中!