06-实战案例

本节通过一个完整的端到端项目,展示如何用深度学习(LSTM+Attention)进行量化时序预测,包括数据处理、模型构建、训练、评估和回测。

项目概述

本项目实现一个完整的量化深度学习预测系统:

项目流程:

1. 数据生成 ─→ 2. 预处理 ─→ 3. Dataset构建 ─→ 4. 模型定义 ─→ 5. 训练 ─→ 6. 评估 ─→ 7. 回测
              │              │              │            │           │          │          │
              ▼              ▼              ▼            ▼           ▼          ▼          ▼
           模拟数据        标准化        DataLoader   LSTM+Att   早停调度   IC/RankIC  分层回测

1. 数据生成

import numpy as np
import pandas as pd
import scipy.stats
 
def generate_quant_data(
    n_stocks=300,
    n_days=500,  # 约2年交易日
    n_features=20,
    noise_level=0.01,
    trend_strength=0.0002,
    momentum_signal_strength=0.3,
    mean_reversion_signal_strength=0.2,
    random_seed=42
):
    """
    生成模拟量化数据
 
    模拟真实股票特征:
    - 价格趋势(随机游走)
    - 动量效应
    - 均值回归
    - 波动率聚类
    - 横截面相关性
 
    Args:
        n_stocks: 股票数量
        n_days: 交易日数量
        n_features: 特征数量
        noise_level: 噪声水平
        trend_strength: 趋势强度
        momentum_signal_strength: 动量信号强度
        mean_reversion_signal_strength: 均值回归信号强度
        random_seed: 随机种子
 
    Returns:
        prices: (n_days, n_stocks) 价格数据
        features: (n_days, n_stocks, n_features) 特征数据
        returns: (n_days, n_stocks) 收益率数据
    """
    np.random.seed(random_seed)
 
    # 1. 生成价格(几何布朗运动)
    dt = 1 / 252  # 日频
    drift = trend_strength * dt
    diffusion = noise_level * np.sqrt(dt)
 
    # 真实收益率(带动量和均值回归信号)
    true_returns = np.zeros((n_days, n_stocks))
 
    for t in range(1, n_days):
        # 动量信号:过去收益的延续
        momentum_signal = np.mean(true_returns[max(0, t-20):t], axis=0) * momentum_signal_strength
 
        # 均值回归信号:价格偏离均值的回归
        if t >= 60:
            price_mean = np.mean(true_returns[t-60:t], axis=0)
            mean_reversion_signal = -price_mean * mean_reversion_signal_strength
        else:
            mean_reversion_signal = 0
 
        # 横截面因素(市场因子)
        market_factor = np.random.normal(0, 1) * 0.5
 
        # 生成收益率
        stock_specific = np.random.normal(0, 1, n_stocks)
        true_returns[t] = (
            drift +
            momentum_signal +
            mean_reversion_signal +
            market_factor * 0.3 +
            stock_specific * diffusion
        )
 
    # 2. 生成价格
    prices = np.cumprod(1 + true_returns, axis=0)
    prices = prices / prices[0] * 100  # 初始价格归一化为100
 
    # 3. 生成特征
    features = np.zeros((n_days, n_stocks, n_features))
 
    # 技术指标特征
    for t in range(60, n_days):
        past_returns = true_returns[t-60:t]
 
        # 动量类特征
        features[t, :, 0] = np.mean(past_returns[-5:], axis=0)   # 5日动量
        features[t, :, 1] = np.mean(past_returns[-10:], axis=0)  # 10日动量
        features[t, :, 2] = np.mean(past_returns[-20:], axis=0)  # 20日动量
 
        # 波动率特征
        features[t, :, 3] = np.std(past_returns[-5:], axis=0)    # 5日波动率
        features[t, :, 4] = np.std(past_returns[-20:], axis=0)   # 20日波动率
 
        # 价格位置特征
        past_prices = prices[t-60:t]
        features[t, :, 5] = (prices[t] - np.mean(past_prices, axis=0)) / np.std(past_prices, axis=0)  # Z-score价格位置
 
        # RSI
        gains = np.where(past_returns[-14:] > 0, past_returns[-14:], 0)
        losses = np.where(past_returns[-14:] < 0, -past_returns[-14:], 0)
        avg_gain = np.mean(gains, axis=0)
        avg_loss = np.mean(losses, axis=0)
        rs = avg_gain / np.where(avg_loss == 0, 1e-6, avg_loss)
        features[t, :, 6] = 100 - 100 / (1 + rs)  # RSI
 
        # 成交量特征(模拟)
        features[t, :, 7] = np.abs(true_returns[t])  # 用收益率绝对值模拟成交量变化
 
        # 横截面排名特征
        features[t, :, 8] = scipy.stats.rankdata(features[t, :, 0]) / n_stocks  # 动量排名
        features[t, :, 9] = scipy.stats.rankdata(features[t, :, 3]) / n_stocks  # 波动率排名
 
    # 填充前60天(用均值)
    for t in range(60):
        features[t] = np.mean(features[60:65], axis=0)
 
    # 剩余特征用噪声填充
    for i in range(10, n_features):
        features[:, :, i] = np.random.randn(n_days, n_stocks) * 0.1
 
    return prices, features, true_returns
 
# 生成数据
import scipy.stats
 
prices, features, returns = generate_quant_data(
    n_stocks=300,
    n_days=500,
    n_features=20,
    random_seed=42
)
 
print(f"数据形状:")
print(f"  价格: {prices.shape}")
print(f"  特征: {features.shape}")
print(f"  收益率: {returns.shape}")
 
# 可视化部分股票价格
import matplotlib.pyplot as plt
 
plt.figure(figsize=(12, 4))
plt.plot(prices[:, :10])
plt.title('模拟股票价格(前10只)')
plt.xlabel('交易日')
plt.ylabel('价格(归一化)')
plt.grid(True)
plt.show()

2. 数据预处理

def preprocess_data(features, returns, train_ratio=0.7, val_ratio=0.15):
    """
    数据预处理
 
    1. 横截面标准化
    2. 时序划分
    3. 填充缺失值
 
    Args:
        features: (n_days, n_stocks, n_features)
        returns: (n_days, n_stocks)
        train_ratio: 训练集比例
        val_ratio: 验证集比例
 
    Returns:
        train_features, val_features, test_features
        train_returns, val_returns, test_returns
    """
    n_days = features.shape[0]
    train_end = int(n_days * train_ratio)
    val_end = int(n_days * (train_ratio + val_ratio))
 
    # 1. 横截面标准化(每期每特征独立标准化)
    features_norm = np.zeros_like(features)
 
    for t in range(n_days):
        for f in range(features.shape[2]):
           截面 = features[t, :, f].copy()
            mean = np.mean(截面)
            std = np.std(截面)
            if std > 0:
                features_norm[t, :, f] = (截面 - mean) / std
            else:
                features_norm[t, :, f] = 0
 
    # 2. 时序划分
    train_features = features_norm[:train_end]
    val_features = features_norm[train_end:val_end]
    test_features = features_norm[val_end:]
 
    train_returns = returns[:train_end]
    val_returns = returns[train_end:val_end]
    test_returns = returns[val_end:]
 
    print("数据划分:")
    print(f"  训练集: {train_end} 天 ({train_ratio*100:.0f}%)")
    print(f"  验证集: {val_end - train_end} 天 ({val_ratio*100:.0f}%)")
    print(f"  测试集: {n_days - val_end} 天 ({(1-train_ratio-val_ratio)*100:.0f}%)")
 
    return (
        (train_features, val_features, test_features),
        (train_returns, val_returns, test_returns)
    )
 
# 预处理数据
(train_feat, val_feat, test_feat), (train_ret, val_ret, test_ret) = preprocess_data(
    features, returns, train_ratio=0.7, val_ratio=0.15
)
 
print(f"\n标准化后特征统计(训练集前1天):")
print(f"  均值: {np.mean(train_feat[0], axis=0)[:5]}")  # 前5个特征
print(f"  标准差: {np.std(train_feat[0], axis=0)[:5]}")

3. Dataset 和 DataLoader

import torch
from torch.utils.data import Dataset, DataLoader
 
class QuantDataset(Dataset):
    """量化时序数据集"""
 
    def __init__(self, features, returns, seq_len=20, horizon=1):
        """
        Args:
            features: (n_days, n_stocks, n_features)
            returns: (n_days, n_stocks)
            seq_len: 序列长度
            horizon: 预测步数
        """
        self.features = torch.tensor(features, dtype=torch.float32)
        self.returns = torch.tensor(returns, dtype=torch.float32)
        self.seq_len = seq_len
        self.horizon = horizon
        self.n_days, self.n_stocks, self.n_features = features.shape
 
    def __len__(self):
        return self.n_days - self.seq_len - self.horizon + 1
 
    def __getitem__(self, idx):
        # 特征序列
        x = self.features[idx:idx + self.seq_len]  # (seq_len, n_stocks, n_features)
 
        # 标签:下一期收益率
        y = self.returns[idx + self.seq_len]  # (n_stocks,)
 
        return x, y
 
# 创建数据集
seq_len = 20
horizon = 1
 
train_dataset = QuantDataset(train_feat, train_ret, seq_len, horizon)
val_dataset = QuantDataset(val_feat, val_ret, seq_len, horizon)
test_dataset = QuantDataset(test_feat, test_ret, seq_len, horizon)
 
# 创建 DataLoader
batch_size = 32
 
train_loader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=False,  # 时序数据不打乱
    num_workers=0,
    drop_last=True
)
 
val_loader = DataLoader(
    val_dataset,
    batch_size=batch_size,
    shuffle=False,
    num_workers=0
)
 
test_loader = DataLoader(
    test_dataset,
    batch_size=batch_size,
    shuffle=False,
    num_workers=0
)
 
print(f"数据集大小:")
print(f"  训练集: {len(train_dataset)} 样本, {len(train_loader)} 批次")
print(f"  验证集: {len(val_dataset)} 样本, {len(val_loader)} 批次")
print(f"  测试集: {len(test_dataset)} 样本, {len(test_loader)} 批次")
 
# 检查一个批次
for x, y in train_loader:
    print(f"\n批次形状:")
    print(f"  X: {x.shape}  # (batch, seq_len, n_stocks, n_features)")
    print(f"  y: {y.shape}  # (batch, n_stocks)")
    break

4. 模型定义(LSTM + Attention)

import torch
import torch.nn as nn
import torch.nn.functional as F
 
class BahdanauAttention(nn.Module):
    """Bahdanau Attention"""
 
    def __init__(self, hidden_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.attn = nn.Linear(hidden_size * 2, hidden_size)
        self.v = nn.Linear(hidden_size, 1, bias=False)
 
    def forward(self, hidden, encoder_outputs):
        """
        Args:
            hidden: (batch, n_stocks, hidden_size)
            encoder_outputs: (batch, seq_len, n_stocks, hidden_size)
        Returns:
            context: (batch, n_stocks, hidden_size)
            attn_weights: (batch, n_stocks, seq_len)
        """
        batch, seq_len, n_stocks, hidden_size = encoder_outputs.size()
 
        # 重塑以便计算注意力
        hidden = hidden.unsqueeze(1).repeat(1, seq_len, 1, 1)  # (batch, seq_len, n_stocks, hidden_size)
 
        # 计算注意力能量
        combined = torch.cat([hidden, encoder_outputs], dim=-1)  # (batch, seq_len, n_stocks, 2*hidden)
        energy = torch.tanh(self.attn(combined))  # (batch, seq_len, n_stocks, hidden)
        attention = self.v(energy).squeeze(-1)  # (batch, seq_len, n_stocks)
 
        # softmax over seq_len
        attn_weights = F.softmax(attention, dim=1)  # (batch, seq_len, n_stocks)
 
        # 加权求和
        # encoder_outputs: (batch, seq_len, n_stocks, hidden)
        # attn_weights: (batch, seq_len, n_stocks)
        context = torch.einsum('bsnh,bsn->bsh', encoder_outputs, attn_weights)
 
        return context, attn_weights.transpose(1, 2)  # (batch, n_stocks, seq_len)
 
class LSTMAttentionModel(nn.Module):
    """LSTM + Attention 量化预测模型"""
 
    def __init__(
        self,
        n_features,
        n_stocks,
        hidden_size=64,
        num_layers=2,
        dropout=0.2
    ):
        super().__init__()
        self.n_features = n_features
        self.n_stocks = n_stocks
        self.hidden_size = hidden_size
 
        # 输入投影:将每个股票的特征映射到 LSTM 输入
        # (n_stocks, n_features) → (n_stocks, hidden_size)
        self.input_projection = nn.Linear(n_features, hidden_size)
 
        # LSTM:处理时间维度
        self.lstm = nn.LSTM(
            input_size=hidden_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )
 
        # Attention
        self.attention = BahdanauAttention(hidden_size)
 
        # 输出层
        self.output_layers = nn.ModuleList([
            nn.Linear(hidden_size * 2, hidden_size),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_size, 1)
        ])
 
    def forward(self, x):
        """
        Args:
            x: (batch, seq_len, n_stocks, n_features)
 
        Returns:
            predictions: (batch, n_stocks)
            attn_weights: (batch, n_stocks, seq_len)
        """
        batch, seq_len, n_stocks, n_features = x.size()
 
        # 重塑:合并 batch 和 stocks 维度
        x = x.permute(0, 2, 1, 3)  # (batch, n_stocks, seq_len, n_features)
        x = x.reshape(batch * n_stocks, seq_len, n_features)
 
        # 输入投影
        x = self.input_projection(x)  # (batch*n_stocks, seq_len, hidden_size)
 
        # LSTM
        lstm_out, (h_n, c_n) = self.lstm(x)  # lstm_out: (batch*n_stocks, seq_len, hidden_size)
 
        # 重塑回 batch 和 stocks
        lstm_out = lstm_out.reshape(batch, n_stocks, seq_len, self.hidden_size)
        lstm_out = lstm_out.permute(0, 2, 1, 3)  # (batch, seq_len, n_stocks, hidden_size)
 
        # 取最后时间步的隐藏状态
        last_hidden = lstm_out[:, -1]  # (batch, n_stocks, hidden_size)
 
        # Attention
        context, attn_weights = self.attention(last_hidden, lstm_out)
 
        # 拼接
        combined = torch.cat([last_hidden, context], dim=-1)  # (batch, n_stocks, hidden_size*2)
 
        # 输出层
        for layer in self.output_layers:
            combined = layer(combined)
 
        predictions = combined.squeeze(-1)  # (batch, n_stocks)
 
        return predictions, attn_weights
 
# 创建模型
model = LSTMAttentionModel(
    n_features=20,
    n_stocks=300,
    hidden_size=64,
    num_layers=2,
    dropout=0.2
)
 
# 测试前向传播
x, y = next(iter(train_loader))
with torch.no_grad():
    predictions, attn_weights = model(x)
 
print(f"模型输出:")
print(f"  预测: {predictions.shape}")  # (batch, n_stocks)
print(f"  注意力权重: {attn_weights.shape}")  # (batch, n_stocks, seq_len)
 
# 计算参数量
total_params = sum(p.numel() for p in model.parameters())
print(f"\n模型参数量: {total_params:,}")

5. 训练

import torch.optim as optim
from torch.cuda.amp import autocast, GradScaler
 
# 设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
 
# 损失函数和优化器
criterion = nn.MSELoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)
 
# 学习率调度器
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=5, min_lr=1e-6
)
 
# 早停
class EarlyStopping:
    def __init__(self, patience=10, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_loss = None
        self.early_stop = False
        self.best_model_state = None
 
    def __call__(self, val_loss, model):
        score = -val_loss
        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(model)
        elif score < self.best_score + self.min_delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(model)
            self.counter = 0
 
    def save_checkpoint(self, model):
        self.best_model_state = model.state_dict().copy()
 
early_stopping = EarlyStopping(patience=10)
 
# 训练循环
def train_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    total_loss = 0
 
    for x, y in dataloader:
        x, y = x.to(device), y.to(device)
 
        # 前向传播
        predictions, _ = model(x)
        loss = criterion(predictions, y)
 
        # 反向传播
        optimizer.zero_grad()
        loss.backward()
 
        # 梯度裁剪
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
 
        optimizer.step()
        total_loss += loss.item()
 
    return total_loss / len(dataloader)
 
def validate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0
 
    with torch.no_grad():
        for x, y in dataloader:
            x, y = x.to(device), y.to(device)
            predictions, _ = model(x)
            loss = criterion(predictions, y)
            total_loss += loss.item()
 
    return total_loss / len(dataloader)
 
# 训练
epochs = 50
train_losses = []
val_losses = []
 
print("开始训练...")
for epoch in range(epochs):
    train_loss = train_epoch(model, train_loader, criterion, optimizer, device)
    val_loss = validate(model, val_loader, criterion, device)
 
    train_losses.append(train_loss)
    val_losses.append(val_loss)
 
    scheduler.step(val_loss)
 
    print(f"Epoch {epoch+1}/{epochs} - Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}")
 
    early_stopping(val_loss, model)
    if early_stopping.early_stop:
        print(f"Early stopping at epoch {epoch+1}")
        break
 
# 加载最佳模型
model.load_state_dict(early_stopping.best_model_state)
 
# 绘制损失曲线
import matplotlib.pyplot as plt
 
plt.figure(figsize=(10, 4))
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.grid(True)
plt.show()

6. 评估

def evaluate_model(model, dataloader, device):
    """评估模型,返回预测值和真实值"""
    model.eval()
    all_predictions = []
    all_targets = []
 
    with torch.no_grad():
        for x, y in dataloader:
            x, y = x.to(device), y.to(device)
            predictions, _ = model(x)
 
            all_predictions.append(predictions.cpu())
            all_targets.append(y.cpu())
 
    predictions = torch.cat(all_predictions, dim=0).numpy()
    targets = torch.cat(all_targets, dim=0).numpy()
 
    return predictions, targets
 
# 计算评估指标
def calculate_metrics(predictions, targets):
    """计算评估指标"""
    from scipy.stats import pearsonr, spearmanr
 
    # Flatten
    pred_flat = predictions.flatten()
    true_flat = targets.flatten()
 
    # MSE, MAE
    mse = np.mean((pred_flat - true_flat) ** 2)
    mae = np.mean(np.abs(pred_flat - true_flat))
 
    # IC (Pearson correlation)
    ic, ic_pvalue = pearsonr(pred_flat, true_flat)
 
    # Rank IC (Spearman correlation)
    rank_ic, rank_ic_pvalue = spearmanr(pred_flat, true_flat)
 
    return {
        'MSE': mse,
        'MAE': mae,
        'IC': ic,
        'IC_pvalue': ic_pvalue,
        'Rank IC': rank_ic,
        'Rank IC_pvalue': rank_ic_pvalue
    }
 
# 评估
train_pred, train_true = evaluate_model(model, train_loader, device)
val_pred, val_true = evaluate_model(model, val_loader, device)
test_pred, test_true = evaluate_model(model, test_loader, device)
 
# 计算指标
train_metrics = calculate_metrics(train_pred, train_true)
val_metrics = calculate_metrics(val_pred, val_true)
test_metrics = calculate_metrics(test_pred, test_true)
 
# 打印结果
print("\n评估指标:")
print(f"{'指标':<15} {'训练集':>12} {'验证集':>12} {'测试集':>12}")
print("-" * 55)
for metric in ['MSE', 'MAE', 'IC', 'Rank IC']:
    print(f"{metric:<15} {train_metrics[metric]:>12.6f} {val_metrics[metric]:>12.6f} {test_metrics[metric]:>12.6f}")

7. 分层回测

def backtest(predictions, targets, n_quantiles=5):
    """
    分层回测
 
    根据预测值将股票分为n组,计算每组收益
 
    Args:
        predictions: (n_samples, n_stocks)
        targets: (n_samples, n_stocks)
        n_quantiles: 分层数
 
    Returns:
        portfolio_returns: 每层收益 (n_samples, n_quantiles)
    """
    n_samples = predictions.shape[0]
    portfolio_returns = np.zeros((n_samples, n_quantiles))
 
    for t in range(n_samples):
        pred = predictions[t]
        ret = targets[t]
 
        # 分层
        quantiles = pd.qcut(pred, n_quantiles, labels=False, duplicates='drop')
 
        # 计算每层平均收益
        for q in range(n_quantiles):
            mask = quantiles == q
            if mask.sum() > 0:
                portfolio_returns[t, q] = ret[mask].mean()
 
    return portfolio_returns
 
# 回测
test_portfolio_returns = backtest(test_pred, test_true, n_quantiles=5)
 
# 计算累计收益
cumulative_returns = np.cumprod(1 + test_portfolio_returns, axis=0) - 1
 
# 绘制分层收益
plt.figure(figsize=(12, 5))
 
# 累计收益曲线
plt.subplot(1, 2, 1)
for q in range(5):
    plt.plot(cumulative_returns[:, q] * 100, label=f'Q{q+1}')
plt.xlabel('Days')
plt.ylabel('Cumulative Returns (%)')
plt.title('分层累计收益')
plt.legend()
plt.grid(True)
 
# 平均收益
plt.subplot(1, 2, 2)
avg_returns = test_portfolio_returns.mean(axis=0) * 10000  # 转换为基点
plt.bar(range(5), avg_returns)
plt.xlabel('Quantile')
plt.ylabel('Avg Daily Return (bps)')
plt.title('各层平均日收益')
plt.grid(True)
 
plt.tight_layout()
plt.show()
 
# 计算多空收益
long_short_return = test_portfolio_returns[:, -1] - test_portfolio_returns[:, 0]
long_short_cumulative = np.cumprod(1 + long_short_return) - 1
 
print(f"\n多空策略 (Q5 - Q1):")
print(f"  累计收益: {long_short_cumulative[-1]*100:.2f}%")
print(f"  年化收益: {(1+long_short_cumulative[-1])**(252/len(long_short_return))-1:.2%}")
print(f"  夏普比率: {long_short_return.mean()/long_short_return.std()*np.sqrt(252):.2f}")
print(f"  胜率: {(long_short_return > 0).mean():.2%}")

8. 滚动训练

def rolling_train(
    model_class,
    data_dict,
    window_size=350,
    retrain_freq=50,  # 每50天重训练一次
    **model_kwargs
):
    """
    滚动训练
 
    模拟实盘:定期用新数据重训练模型
    """
    all_features, all_returns = data_dict['features'], data_dict['returns']
    n_days = len(all_features)
 
    predictions_list = []
 
    # 起始位置
    start = window_size
 
    while start + retrain_freq < n_days:
        end = min(start + retrain_freq, n_days)
 
        print(f"\n训练区间: [{max(0, start-window_size)}, {start}), 预测区间: [{start}, {end})")
 
        # 准备训练数据
        train_feat = all_features[max(0, start-window_size):start]
        train_ret = all_returns[max(0, start-window_size):start]
 
        # 横截面标准化
        for t in range(len(train_feat)):
            for f in range(train_feat.shape[2]):
                截面 = train_feat[t, :, f]
                mean = np.mean(截面)
                std = np.std(截面)
                if std > 0:
                    train_feat[t, :, f] = (截面 - mean) / std
 
        # 创建数据集
        train_dataset = QuantDataset(train_feat, train_ret, seq_len=20, horizon=1)
        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=False)
 
        # 创建模型
        model = model_class(**model_kwargs).to(device)
        criterion = nn.MSELoss()
        optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)
 
        # 训练
        for epoch in range(10):  # 滚动训练用较少epoch
            train_loss = train_epoch(model, train_loader, criterion, optimizer, device)
            if (epoch + 1) % 5 == 0:
                print(f"  Epoch {epoch+1}/10, Loss: {train_loss:.6f}")
 
        # 预测
        model.eval()
        with torch.no_grad():
            for t in range(start, end):
                if t >= 20:
                    # 准备输入
                    x = all_features[t-20:t].copy()
                    for i in range(20):
                        for f in range(x.shape[2]):
                            截面 = x[i, :, f]
                            mean = np.mean(截面)
                            std = np.std(截面)
                            if std > 0:
                                x[i, :, f] = (截面 - mean) / std
 
                    x_tensor = torch.tensor(x, dtype=torch.float32).unsqueeze(0).to(device)
                    pred, _ = model(x_tensor)
                    predictions_list.append(pred.squeeze(0).cpu().numpy())
 
        start = end
 
    return np.array(predictions_list)
 
# 执行滚动训练(简化版,只演示概念)
# rolling_predictions = rolling_train(
#     LSTMAttentionModel,
#     data_dict={'features': features, 'returns': returns},
#     window_size=350,
#     retrain_freq=50,
#     n_features=20,
#     n_stocks=300,
#     hidden_size=64,
#     num_layers=2,
#     dropout=0.2
# )

9. 树模型 vs 深度学习对比

# 使用 LightGBM 作为对比
import lightgbm as lgb
from sklearn.metrics import mean_squared_error
 
# 准备 LightGBM 数据
def prepare_lgb_data(features, returns, seq_len=20):
    """为 LightGBM 准备数据"""
    X_list = []
    y_list = []
 
    for t in range(seq_len, len(features)):
        # 取过去 seq_len 天的特征
        historical_features = features[t-seq_len:t]
        # 展平: (n_stocks, seq_len * n_features)
        X_flat = historical_features.transpose(1, 0, 2).reshape(features.shape[1], -1)
        X_list.append(X_flat)
 
        # 标签
        y_list.append(returns[t])
 
    X = np.vstack(X_list)
    y = np.hstack(y_list)
 
    return X, y
 
# 准备数据
X_train, y_train = prepare_lgb_data(train_feat, train_ret, seq_len=20)
X_val, y_val = prepare_lgb_data(val_feat, val_ret, seq_len=20)
X_test, y_test = prepare_lgb_data(test_feat, test_ret, seq_len=20)
 
print(f"LightGBM 数据形状:")
print(f"  训练: {X_train.shape}")
print(f"  测试: {X_test.shape}")
 
# 训练 LightGBM
lgb_train = lgb.Dataset(X_train, y_train)
lgb_val = lgb.Dataset(X_val, y_val, reference=lgb_train)
 
params = {
    'objective': 'regression',
    'metric': 'mse',
    'num_leaves': 31,
    'learning_rate': 0.05,
    'feature_fraction': 0.8,
    'bagging_fraction': 0.8,
    'bagging_freq': 5,
    'verbose': -1
}
 
gbm = lgb.train(
    params,
    lgb_train,
    num_boost_round=500,
    valid_sets=[lgb_train, lgb_val],
    callbacks=[lgb.early_stopping(10), lgb.log_evaluation(50)]
)
 
# 预测
lgb_pred = gbm.predict(X_test)
 
# 重塑为 (n_samples, n_stocks)
n_test_samples = len(test_dataset)
lgb_pred_reshaped = lgb_pred.reshape(n_test_samples, -1)
 
# 计算 IC
lgb_ic, _ = pearsonr(lgb_pred_reshaped.flatten(), test_true.flatten())
lgb_rank_ic, _ = spearmanr(lgb_pred_reshaped.flatten(), test_true.flatten())
 
print(f"\n模型对比 (测试集):")
print(f"{'指标':<15} {'LSTM+Attention':>15} {'LightGBM':>15}")
print("-" * 50)
print(f"{'IC':<15} {test_metrics['IC']:>15.6f} {lgb_ic:>15.6f}")
print(f"{'Rank IC':<15} {test_metrics['Rank IC']:>15.6f} {lgb_rank_ic:>15.6f}")

10. 可视化注意力权重

# 获取注意力权重
model.eval()
with torch.no_grad():
    x, y = next(iter(test_loader))
    x, y = x.to(device), y.to(device)
    predictions, attn_weights = model(x)
 
# 可视化第一个样本、第一只股票的注意力权重
sample_idx = 0
stock_idx = 0
 
attn = attn_weights[sample_idx, stock_idx, :].cpu().numpy()
 
plt.figure(figsize=(10, 3))
plt.bar(range(20), attn)
plt.xlabel('Time Step (t-20 to t-1)')
plt.ylabel('Attention Weight')
plt.title(f'Attention Weights - Sample {sample_idx}, Stock {stock_idx}')
plt.grid(True)
plt.show()
 
print(f"最关注的时间步: {np.argmax(attn)} (t-{20-np.argmax(attn)})")

核心知识点总结

1. 完整项目流程

数据生成 → 预处理 → Dataset → 模型 → 训练 → 评估 → 回测

2. 量化数据处理

# 横截面标准化(每期每特征独立)
for t in range(n_days):
    for f in range(n_features):
        features[t, :, f] = (features[t, :, f] - mean) / std

3. LSTM + Attention

# 结合 LSTM 序列建模和 Attention 关注重点
lstm_out, _ = self.lstm(x)
context, attn_weights = self.attention(last_hidden, lstm_out)

4. 评估指标

# IC: Pearson 相关系数
# Rank IC: Spearman 等级相关系数
# 分层回测: 按预测值分组,计算各组收益

结果解读和优化建议

结果解读

  1. IC > 0.03:模型有预测能力
  2. Rank IC > 0.05:排序能力强
  3. 分层单调性:Q1 < Q2 < Q3 < Q4 < Q5 说明模型有效
  4. 多空收益:Q5 - Q1 的年化收益和夏普比率

优化建议

问题可能原因解决方案
IC 接近 0特征不足、模型过拟合增加特征、增强正则化
过拟合模型太复杂、数据太少减少 hidden_size、增加 dropout
训练不稳定学习率过大降低学习率、使用 warmup
泛化差分布漂移大滚动训练、使用更多历史数据
分层不单调模型排序能力弱使用 Rank IC Loss

进一步优化方向

  1. 特征工程:添加更多技术指标、基本面因子
  2. 模型架构:尝试 Transformer、TCN 等架构
  3. 损失函数:使用 IC Loss、组合损失
  4. 训练策略:对抗训练、集成学习
  5. 风险控制:加入波动率预测、风险模型

项目代码总结

本项目展示了一个完整的量化深度学习流程,核心代码模块:

  1. 数据生成generate_quant_data()
  2. 预处理preprocess_data()
  3. 数据集QuantDataset
  4. 模型LSTMAttentionModel
  5. 训练train_epoch(), validate()
  6. 评估calculate_metrics()
  7. 回测backtest()

这些代码可以直接应用到实际量化项目中!