实战案例

目录


1. 项目概述

1.1 目标

构建一个完整的量化选股模型,使用 LightGBM 预测股票未来收益。

1.2 流程图

数据生成 → 特征工程 → 时序划分 → 模型训练 → 模型评估 → 特征分析 → 滚动训练
    ↓         ↓         ↓         ↓         ↓         ↓         ↓
  500股票   10个因子   Walk-Forward  LightGBM   IC/分层    SHAP    月度重训
   ×3年      标准化    + Purging   +Optuna    分析      分析

1.3 环境准备

# 确保安装了所有依赖
import numpy as np
import pandas as pd
import lightgbm as lgb
import optuna
import shap
import warnings
from sklearn.metrics import mean_squared_error
from scipy import stats
import matplotlib.pyplot as plt
import seaborn as sns
 
warnings.filterwarnings('ignore')
plt.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'SimHei']
plt.rcParams['axes.unicode_minus'] = False
 
print("环境检查完成!")
print(f"NumPy: {np.__version__}")
print(f"Pandas: {pd.__version__}")
print(f"LightGBM: {lgb.__version__}")

2. 数据生成

2.1 生成模拟股票数据

def generate_stock_data(n_stocks=500, n_days=750, seed=42):
    """
    生成模拟股票数据
 
    参数:
        n_stocks: 股票数量
        n_days: 交易日数量(约3年)
        seed: 随机种子
 
    返回:
        df: 包含日期、股票、特征、收益的 DataFrame
    """
    np.random.seed(seed)
 
    # 日期范围
    dates = pd.date_range('2021-01-01', periods=n_days, freq='B')  # 工作日
    stocks = [f'STOCK{i:04d}' for i in range(n_stocks)]
 
    # 创建空 DataFrame
    data = []
 
    for date in dates:
        for stock in stocks:
            data.append({'date': date, 'stock': stock})
 
    df = pd.DataFrame(data)
 
    # 生成基础特征(模拟真实因子)
    n = len(df)
 
    # 1. 动量因子:20日收益率
    df['momentum_20'] = np.random.randn(n) * 0.1
 
    # 2. 反转因子:5日收益率的负值
    df['reversal_5'] = -np.random.randn(n) * 0.05
 
    # 3. 波动率因子:20日波动率
    df['volatility_20'] = np.random.rand(n) * 0.5 + 0.1
 
    # 4. 换手率因子
    df['turnover'] = np.random.rand(n) * 0.2 + 0.01
 
    # 5. 估值因子:市盈率倒数
    df['pe_inverse'] = np.random.randn(n) * 0.02 + 0.05
 
    # 6. 规模因子:市值的对数
    df['log_market_cap'] = np.random.randn(n) * 0.5 + 10
 
    # 7. 质量因子:ROE
    df['roe'] = np.random.randn(n) * 0.05 + 0.1
 
    # 8. 技术因子:RSI
    df['rsi'] = np.random.rand(n) * 100
 
    # 9. 成交量变化
    df['volume_change'] = np.random.randn(n) * 0.3
 
    # 10. 行业哑变量(简化为数值)
    df['sector'] = np.random.randint(0, 10, n)
 
    # 生成真实收益(只有部分因子有效)
    # 真实模型:收益 = 0.3*动量 - 0.2*反转 - 0.15*波动率 + 0.1*ROE + 噪声
    true_signal = (
        0.30 * df['momentum_20'].values +
        -0.20 * df['reversal_5'].values +
        -0.15 * (df['volatility_20'].values - 0.3) +
        0.10 * (df['roe'].values - 0.1) +
        0.05 * df['log_market_cap'].values
    )
 
    # 添加噪声和时序变化
    time_effect = np.linspace(0, 0.5, n_days)
    time_tiled = np.tile(time_effect, n_stocks)
 
    df['return'] = true_signal * 0.5 + np.random.randn(n) * 0.3 + time_tiled * 0.001
 
    # 添加一些缺失值
    missing_mask = np.random.rand(n) < 0.02
    df.loc[missing_mask, 'turnover'] = np.nan
 
    return df
 
# 生成数据
df = generate_stock_data(n_stocks=500, n_days=750)
 
print("数据生成完成!")
print(f"总样本数: {len(df)}")
print(f"时间范围: {df['date'].min()}{df['date'].max()}")
print(f"股票数量: {df['stock'].nunique()}")
print(f"\n数据预览:")
print(df.head(10))

2.2 数据探索

# 收益率分布
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
 
# 收益率直方图
axes[0].hist(df['return'], bins=50, color='steelblue', alpha=0.7, edgecolor='black')
axes[0].axvline(df['return'].mean(), color='red', linestyle='--', linewidth=2, label=f"均值: {df['return'].mean():.4f}")
axes[0].set_xlabel('Return')
axes[0].set_ylabel('Frequency')
axes[0].set_title('Return Distribution')
axes[0].legend()
axes[0].grid(True, alpha=0.3, axis='y')
 
# 收益率时序图
daily_return = df.groupby('date')['return'].mean()
axes[1].plot(daily_return.index, daily_return.values, color='steelblue', linewidth=1)
axes[1].axhline(0, color='black', linestyle='--', linewidth=0.5)
axes[1].set_xlabel('Date')
axes[1].set_ylabel('Average Return')
axes[1].set_title('Daily Average Return')
axes[1].grid(True, alpha=0.3)
 
plt.tight_layout()
plt.show()
 
# 特征统计
feature_cols = ['momentum_20', 'reversal_5', 'volatility_20', 'turnover',
                'pe_inverse', 'log_market_cap', 'roe', 'rsi', 'volume_change']
 
print("\n特征统计:")
print(df[feature_cols].describe().round(4))

3. 特征工程

3.1 特征标准化

def standardize_features(df, feature_cols, method='zscore'):
    """
    特征标准化
 
    参数:
        df: 数据
        feature_cols: 特征列
        method: 'zscore' 或 'rank'
 
    返回:
        df: 处理后的数据
    """
    df = df.copy()
 
    if method == 'zscore':
        # Z-score 标准化(按滚动窗口计算,避免前视偏差)
        for col in feature_cols:
            # 按日期分组计算滚动统计
            stats = df.groupby('date')[col].agg(['mean', 'std'])
            stats.columns = [f'{col}_mean', f'{col}_std']
 
            df = df.merge(stats, left_on='date', right_index=True, how='left')
            df[col] = (df[col] - df[f'{col}_mean']) / (df[f'{col}_std'] + 1e-8)
            df = df.drop([f'{col}_mean', f'{col}_std'], axis=1)
 
    elif method == 'rank':
        # 排序标准化(更稳健)
        for col in feature_cols:
            df[col] = df.groupby('date')[col].rank(pct=True)
            df[col] = (df[col] - 0.5) * 2  # 映射到 [-1, 1]
 
    return df
 
# 标准化
df_processed = standardize_features(df, feature_cols, method='zscore')
 
print("特征标准化完成")
print(df_processed[feature_cols].describe().round(4))

3.2 处理缺失值

# 处理缺失值
def handle_missing_values(df, feature_cols):
    """处理缺失值:用当日的中位数填充"""
    for col in feature_cols:
        daily_median = df.groupby('date')[col].transform('median')
        df[col] = df[col].fillna(daily_median)
 
    # 检查是否还有缺失
    remaining_nulls = df[feature_cols].isnull().sum()
    if remaining_nulls.sum() > 0:
        print("警告:仍有缺失值,用全局中位数填充")
        df[feature_cols] = df[feature_cols].fillna(df[feature_cols].median())
 
    return df
 
df_processed = handle_missing_values(df_processed, feature_cols)
print(f"\n缺失值检查: {df_processed[feature_cols].isnull().sum().sum()}")

4. 时序划分

4.1 Walk-Forward 划分

def walk_forward_split(df, train_size=1.5, val_size=0.25, test_size=0.25):
    """
    Walk-Forward 时序划分
 
    参数:
        df: 数据
        train_size: 训练集年数
        val_size: 验证集年数
        test_size: 测试集年数
 
    返回:
        splits: 划分列表
    """
    df = df.sort_values('date').reset_index(drop=True)
 
    total_days = df['date'].nunique()
    train_days = int(train_size * 252)
    val_days = int(val_size * 252)
    test_days = int(test_size * 252)
 
    splits = []
    current_test_start = train_days + val_days
 
    while current_test_start + test_days <= total_days:
        train_end = current_test_start - val_days
        train_start = max(0, train_end - train_days)
 
        test_end = min(current_test_start + test_days, total_days)
 
        train_dates = df['date'].unique()[train_start:train_end]
        val_dates = df['date'].unique()[train_end:current_test_start]
        test_dates = df['date'].unique()[current_test_start:test_end]
 
        train_df = df[df['date'].isin(train_dates)]
        val_df = df[df['date'].isin(val_dates)]
        test_df = df[df['date'].isin(test_dates)]
 
        splits.append({
            'train': train_df,
            'val': val_df,
            'test': test_df,
            'dates': (train_dates[0], test_dates[-1])
        })
 
        current_test_start += test_days
 
    return splits
 
# 划分数据
splits = walk_forward_split(df_processed, train_size=1.5, val_size=0.25, test_size=0.25)
 
print(f"生成 {len(splits)} 个 Walk-Forward 周期")
 
for i, split in enumerate(splits[:3]):
    print(f"\n周期 {i+1}:")
    print(f"  训练: {split['dates'][0]}{split['train']['date'].max()} ({len(split['train']['date'].unique())} 天)")
    print(f"  验证: {split['val']['date'].min()}{split['val']['date'].max()} ({len(split['val']['date'].unique())} 天)")
    print(f"  测试: {split['test']['date'].min()}{split['dates'][1]} ({len(split['test']['date'].unique())} 天)")

4.2 使用第一个周期进行初始训练

# 使用第一个划分进行演示
split = splits[0]
 
train_df = split['train']
val_df = split['val']
test_df = split['test']
 
X_train = train_df[feature_cols].values
y_train = train_df['return'].values
 
X_val = val_df[feature_cols].values
y_val = val_df['return'].values
 
X_test = test_df[feature_cols].values
y_test = test_df['return'].values
 
print(f"训练集: {X_train.shape[0]} 样本")
print(f"验证集: {X_val.shape[0]} 样本")
print(f"测试集: {X_test.shape[0]} 样本")

5. 模型训练与调优

5.1 基础模型训练

# LightGBM 基础训练
def train_lgb_model(X_train, y_train, X_val, y_val, params=None):
    """训练 LightGBM 模型"""
    if params is None:
        params = {
            'objective': 'regression',
            'metric': 'rmse',
            'max_depth': 6,
            'num_leaves': 31,
            'learning_rate': 0.05,
            'min_child_samples': 50,
            'bagging_fraction': 0.8,
            'feature_fraction': 0.8,
            'bagging_freq': 1,
            'lambda_l1': 0.1,
            'lambda_l2': 1.0,
            'verbose': -1
        }
 
    train_data = lgb.Dataset(X_train, label=y_train)
    valid_data = lgb.Dataset(X_val, label=y_val, reference=train_data)
 
    model = lgb.train(
        params,
        train_data,
        num_boost_round=2000,
        valid_sets=[train_data, valid_data],
        callbacks=[
            lgb.early_stopping(stopping_rounds=100),
            lgb.log_evaluation(period=200)
        ]
    )
 
    return model
 
# 训练
model = train_lgb_model(X_train, y_train, X_val, y_val)
 
print(f"\n最佳迭代数: {model.best_iteration}")

5.2 Optuna 超参数调优

def objective_lgb(trial, X_train, y_train, X_val, y_val):
    """Optuna 目标函数"""
    params = {
        'objective': 'regression',
        'metric': 'rmse',
        'max_depth': trial.suggest_int('max_depth', 3, 10),
        'num_leaves': trial.suggest_int('num_leaves', 15, 127),
        'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.2, log=True),
        'min_child_samples': trial.suggest_int('min_child_samples', 10, 100),
        'bagging_fraction': trial.suggest_float('bagging_fraction', 0.5, 1.0),
        'feature_fraction': trial.suggest_float('feature_fraction', 0.5, 1.0),
        'lambda_l1': trial.suggest_float('lambda_l1', 0, 1),
        'lambda_l2': trial.suggest_float('lambda_l2', 0, 10),
        'verbose': -1
    }
 
    train_data = lgb.Dataset(X_train, label=y_train)
    valid_data = lgb.Dataset(X_val, label=y_val, reference=train_data)
 
    model = lgb.train(
        params,
        train_data,
        num_boost_round=1000,
        valid_sets=[valid_data],
        callbacks=[lgb.early_stopping(stopping_rounds=50), lgb.log_evaluation(period=False)]
    )
 
    y_pred = model.predict(X_val, num_iteration=model.best_iteration)
    mse = mean_squared_error(y_val, y_pred)
 
    return mse
 
# 运行 Optuna(演示用少量 trials)
print("开始超参数优化...")
 
study = optuna.create_study(direction='minimize')
study.optimize(lambda trial: objective_lgb(trial, X_train, y_train, X_val, y_val),
              n_trials=20, show_progress_bar=True)
 
print(f"\n最佳参数: {study.best_params}")
print(f"最佳验证 MSE: {study.best_value:.4f}")
 
# 使用最佳参数训练最终模型
best_model = train_lgb_model(X_train, y_train, X_val, y_val, params=study.best_params)

6. 模型评估

6.1 IC 分析

def calculate_ic(predictions, returns):
    """计算 IC"""
    mask = ~(np.isnan(predictions) | np.isnan(returns))
    ic = np.corrcoef(predictions[mask], returns[mask])[0, 1]
    return ic
 
# 预测
y_pred_train = best_model.predict(X_train)
y_pred_val = best_model.predict(X_val)
y_pred_test = best_model.predict(X_test)
 
# 计算 IC
ic_train = calculate_ic(y_pred_train, y_train)
ic_val = calculate_ic(y_pred_val, y_val)
ic_test = calculate_ic(y_pred_test, y_test)
 
print("=== IC 分析 ===")
print(f"训练集 IC:  {ic_train:.4f}")
print(f"验证集 IC:  {ic_val:.4f}")
print(f"测试集 IC:  {ic_test:.4f}")
 
# Rank IC
from scipy.stats import spearmanr
 
rank_ic_test = spearmanr(y_pred_test, y_test)[0]
print(f"测试集 Rank IC: {rank_ic_test:.4f}")

6.2 分层回测

def quantile_backtest(predictions, returns, n_quantiles=5):
    """分层回测"""
    data = pd.DataFrame({
        'prediction': predictions,
        'return': returns
    }).dropna()
 
    # 分组
    data['group'] = pd.qcut(data['prediction'], n_quantiles, labels=False, duplicates='drop')
 
    # 计算各组收益
    group_stats = []
    for i in range(n_quantiles):
        group_data = data[data['group'] == i]
        stats_i = {
            'group': i + 1,
            'n': len(group_data),
            'mean_pred': group_data['prediction'].mean(),
            'mean_return': group_data['return'].mean(),
            'std_return': group_data['return'].std()
        }
        group_stats.append(stats_i)
 
    group_df = pd.DataFrame(group_stats)
 
    # 多空收益
    long_return = group_df.iloc[-1]['mean_return']
    short_return = group_df.iloc[0]['mean_return']
    ls_return = long_return - short_return
 
    return group_df, ls_return
 
# 分层回测
group_df, ls_return = quantile_backtest(y_pred_test, y_test, n_quantiles=5)
 
print("\n=== 分层回测结果 ===")
print(group_df)
print(f"\n多空收益: {ls_return:.4f}")
 
# 可视化
fig, ax = plt.subplots(figsize=(10, 5))
colors = ['red', 'orange', 'yellow', 'lightgreen', 'green']
ax.bar(group_df['group'], group_df['mean_return'], color=colors, alpha=0.7)
ax.axhline(0, color='black', linestyle='--', linewidth=0.5)
ax.set_xlabel('Quantile Group')
ax.set_ylabel('Mean Return')
ax.set_title('Quantile Backtest Results')
ax.grid(True, alpha=0.3, axis='y')
 
# 添加数值标签
for i, row in group_df.iterrows():
    ax.text(row['group'], row['mean_return'], f"{row['mean_return']:.4f}",
            ha='center', va='bottom' if row['mean_return'] > 0 else 'top')
 
plt.tight_layout()
plt.show()

6.3 滚动 IC 分析

def rolling_ic_analysis(dates, predictions, returns, window=42):
    """滚动 IC 分析(约2个月)"""
    ic_series = []
 
    for i in range(window, len(dates)):
        pred_window = predictions[i-window:i]
        ret_window = returns[i-window:i]
 
        # 计算截面 IC(同一日期不同股票)
        # 这里简化为时间窗口内的相关系数
        ic = np.corrcoef(pred_window, ret_window)[0, 1]
        ic_series.append({'date': dates[i], 'ic': ic})
 
    return pd.DataFrame(ic_series).set_index('date')
 
# 对测试集进行滚动 IC 分析
test_dates = test_df['date'].unique()
ic_df = rolling_ic_analysis(test_dates, y_pred_test, y_test, window=42)
 
print("\n=== 滚动 IC 分析 ===")
print(f"IC 均值:   {ic_df['ic'].mean():.4f}")
print(f"IC 标准差: {ic_df['ic'].std():.4f}")
print(f"ICIR:      {ic_df['ic'].mean() / ic_df['ic'].std():.4f}")
print(f"IC > 0 比例: {(ic_df['ic'] > 0).mean():.2%}")
 
# 可视化
fig, axes = plt.subplots(1, 2, figsize=(14, 4))
 
# IC 时间序列
axes[0].plot(ic_df.index, ic_df['ic'], marker='o', linestyle='-', alpha=0.6)
axes[0].axhline(0, color='black', linestyle='--', linewidth=0.5)
axes[0].axhline(ic_df['ic'].mean(), color='red', linestyle='--', label=f'Mean: {ic_df["ic"].mean():.4f}')
axes[0].set_xlabel('Date')
axes[0].set_ylabel('IC')
axes[0].set_title('Rolling IC')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
 
# IC 分布
axes[1].hist(ic_df['ic'], bins=20, color='steelblue', alpha=0.7, edgecolor='black')
axes[1].axvline(ic_df['ic'].mean(), color='red', linestyle='--', linewidth=2, label=f"Mean: {ic_df['ic'].mean():.4f}")
axes[1].axvline(0, color='black', linestyle='--', linewidth=0.5)
axes[1].set_xlabel('IC')
axes[1].set_ylabel('Frequency')
axes[1].set_title('IC Distribution')
axes[1].legend()
axes[1].grid(True, alpha=0.3, axis='y')
 
plt.tight_layout()
plt.show()

7. 特征分析

7.1 特征重要性

# 特征重要性
importance = best_model.feature_importance(importance_type='gain')
importance_df = pd.DataFrame({
    'feature': feature_cols,
    'importance': importance,
    'importance_pct': importance / importance.sum() * 100
}).sort_values('importance', ascending=False)
 
print("=== 特征重要性 ===")
print(importance_df.head(15))
 
# 可视化
fig, ax = plt.subplots(figsize=(10, 6))
ax.barh(range(len(importance_df.head(15))),
        importance_df.head(15)['importance'],
        color='steelblue')
ax.set_yticks(range(len(importance_df.head(15))))
ax.set_yticklabels(importance_df.head(15)['feature'])
ax.set_xlabel('Importance (Gain)')
ax.set_title('Feature Importance (Top 15)')
ax.invert_yaxis()
ax.grid(True, alpha=0.3, axis='x')
plt.tight_layout()
plt.show()

7.2 SHAP 分析

# SHAP 分析
explainer = shap.TreeExplainer(best_model)
shap_values = explainer.shap_values(X_test)
 
# Summary plot
shap.summary_plot(shap_values, X_test, feature_names=feature_cols, show=False)
plt.gcf().set_size_inches(10, 6)
plt.title("SHAP Summary Plot")
plt.tight_layout()
plt.show()
 
# SHAP 重要性
shap_importance = np.abs(shap_values).mean(axis=0)
shap_df = pd.DataFrame({
    'feature': feature_cols,
    'shap_importance': shap_importance
}).sort_values('shap_importance', ascending=False)
 
print("\n=== SHAP 特征重要性 ===")
print(shap_df.head(10))

8. 滚动训练

8.1 Walk-Forward 回测

def walk_forward_backtest(splits, feature_cols, params=None):
    """
    Walk-Forward 回测
 
    对每个划分周期:
    1. 训练模型
    2. 在测试集上评估
    """
    results = []
 
    for i, split in enumerate(splits):
        print(f"\n=== 周期 {i+1}/{len(splits)} ===")
 
        train_df = split['train']
        val_df = split['val']
        test_df = split['test']
 
        X_train = train_df[feature_cols].values
        y_train = train_df['return'].values
        X_val = val_df[feature_cols].values
        y_val = val_df['return'].values
        X_test = test_df[feature_cols].values
        y_test = test_df['return'].values
 
        # 训练
        model = train_lgb_model(X_train, y_train, X_val, y_val, params=params)
 
        # 预测
        y_pred = model.predict(X_test)
 
        # 评估
        ic = calculate_ic(y_pred, y_test)
        _, ls_return = quantile_backtest(y_pred, y_test, n_quantiles=5)
 
        results.append({
            'period': i + 1,
            'train_start': split['dates'][0],
            'test_end': split['dates'][1],
            'best_iteration': model.best_iteration,
            'ic': ic,
            'long_short_return': ls_return
        })
 
        print(f"IC: {ic:.4f}, 多空收益: {ls_return:.4f}")
 
    return pd.DataFrame(results)
 
# 运行 Walk-Forward 回测(使用前3个周期演示)
wf_results = walk_forward_backtest(splits[:3], feature_cols, params=study.best_params)
 
# 结果汇总
print("\n=== Walk-Forward 回测结果汇总 ===")
print(wf_results)
 
print("\n平均表现:")
print(f"  平均 IC:        {wf_results['ic'].mean():.4f} ± {wf_results['ic'].std():.4f}")
print(f"  平均多空收益:   {wf_results['long_short_return'].mean():.4f} ± {wf_results['long_short_return'].std():.4f}")
print(f"  IC > 0 比例:    {(wf_results['ic'] > 0).mean():.2%}")

9. 项目模板

9.1 完整项目类

class QuantTreeModelProject:
    """量化树模型项目模板"""
 
    def __init__(self, feature_cols, params=None):
        """
        初始化项目
 
        参数:
            feature_cols: 特征列列表
            params: 模型参数字典
        """
        self.feature_cols = feature_cols
        self.params = params or self._default_params()
        self.model = None
        self.results = {}
 
    def _default_params(self):
        """默认模型参数"""
        return {
            'objective': 'regression',
            'metric': 'rmse',
            'max_depth': 6,
            'num_leaves': 31,
            'learning_rate': 0.05,
            'min_child_samples': 50,
            'bagging_fraction': 0.8,
            'feature_fraction': 0.8,
            'bagging_freq': 1,
            'lambda_l1': 0.1,
            'lambda_l2': 1.0,
            'verbose': -1
        }
 
    def prepare_data(self, df):
        """数据预处理"""
        df = df.copy()
 
        # 标准化
        df = standardize_features(df, self.feature_cols, method='zscore')
 
        # 处理缺失值
        df = handle_missing_values(df, self.feature_cols)
 
        return df
 
    def train(self, train_df, val_df):
        """训练模型"""
        X_train = train_df[self.feature_cols].values
        y_train = train_df['return'].values
        X_val = val_df[self.feature_cols].values
        y_val = val_df['return'].values
 
        self.model = train_lgb_model(X_train, y_train, X_val, y_val, self.params)
 
        return self.model
 
    def predict(self, df):
        """预测"""
        X = df[self.feature_cols].values
        return self.model.predict(X)
 
    def evaluate(self, test_df):
        """评估模型"""
        y_test = test_df['return'].values
        y_pred = self.predict(test_df)
 
        # IC
        ic = calculate_ic(y_pred, y_test)
 
        # 分层回测
        group_df, ls_return = quantile_backtest(y_pred, y_test, n_quantiles=5)
 
        # 结果
        results = {
            'ic': ic,
            'group_stats': group_df,
            'long_short_return': ls_return,
            'predictions': y_pred
        }
 
        self.results = results
        return results
 
    def run_pipeline(self, df):
        """运行完整流程"""
        print("=== 开始量化树模型流程 ===\n")
 
        # 1. 数据预处理
        print("1. 数据预处理...")
        df_processed = self.prepare_data(df)
 
        # 2. 时序划分
        print("2. 时序划分...")
        splits = walk_forward_split(df_processed)
        split = splits[0]
 
        # 3. 训练
        print("3. 训练模型...")
        self.train(split['train'], split['val'])
 
        # 4. 评估
        print("4. 评估模型...")
        results = self.evaluate(split['test'])
 
        # 5. 输出结果
        print("\n=== 模型评估结果 ===")
        print(f"IC:            {results['ic']:.4f}")
        print(f"多空收益:       {results['long_short_return']:.4f}")
 
        return results
 
# 使用模板
project = QuantTreeModelProject(feature_cols=feature_cols)
results = project.run_pipeline(df)

9.2 保存和加载

import joblib
 
def save_project(project, filepath):
    """保存项目"""
    package = {
        'model': project.model,
        'params': project.params,
        'feature_cols': project.feature_cols,
        'results': project.results
    }
    joblib.dump(package, filepath)
    print(f"项目已保存到: {filepath}")
 
def load_project(filepath):
    """加载项目"""
    package = joblib.load(filepath)
 
    project = QuantTreeModelProject(
        feature_cols=package['feature_cols'],
        params=package['params']
    )
    project.model = package['model']
    project.results = package['results']
 
    print(f"项目已从 {filepath} 加载")
    return project
 
# 保存
save_project(project, 'quant_tree_model.pkl')
 
# 加载
# loaded_project = load_project('quant_tree_model.pkl')

核心知识点总结

完整流程回顾

1. 数据生成
   ├── 模拟多股票×多日数据
   ├── 生成真实因子
   └── 添加噪声和时序效应

2. 特征工程
   ├── Z-score 标准化
   └── 缺失值处理

3. 时序划分
   └── Walk-Forward + Purging

4. 模型训练
   ├── LightGBM 基础训练
   └── Optuna 超参数调优

5. 模型评估
   ├── IC / Rank IC
   ├── 分层回测
   └── 滚动 IC 分析

6. 特征分析
   ├── 内置重要性
   └── SHAP 值分析

7. 滚动训练
   └── Walk-Forward 回测验证

8. 项目封装
   └── 可复用的项目模板

关键代码片段

功能代码
标准化df.groupby('date')[col].transform(lambda x: (x - x.mean()) / x.std())
时序划分Walk-Forward + Purging
LightGBM 训练lgb.train() with early stopping
IC 计算np.corrcoef(pred, true)[0, 1]
分组回测pd.qcut(predictions, n_quantiles)
SHAP 分析shap.TreeExplainer(model)

常见问题

  1. 数据泄漏: 确保特征工程在时序划分后进行
  2. 过拟合: 使用早停和正则化
  3. IC 不稳定: 增加训练数据,简化模型
  4. 计算效率: 使用 LightGBM 原生 API

恭喜!你已经完成了树模型实战模块的学习。现在你可以:

  1. 理解梯度提升树的核心原理
  2. 正确处理量化时序数据
  3. 高效训练和调优树模型
  4. 使用量化专用指标评估模型
  5. 分析特征重要性和稳定性
  6. 构建完整的量化选股项目

继续探索和实践,不断提升你的量化机器学习技能!