实战案例
目录
1. 项目概述
1.1 目标
构建一个完整的量化选股模型,使用 LightGBM 预测股票未来收益。
1.2 流程图
数据生成 → 特征工程 → 时序划分 → 模型训练 → 模型评估 → 特征分析 → 滚动训练
↓ ↓ ↓ ↓ ↓ ↓ ↓
500股票 10个因子 Walk-Forward LightGBM IC/分层 SHAP 月度重训
×3年 标准化 + Purging +Optuna 分析 分析
1.3 环境准备
# 确保安装了所有依赖
import numpy as np
import pandas as pd
import lightgbm as lgb
import optuna
import shap
import warnings
from sklearn.metrics import mean_squared_error
from scipy import stats
import matplotlib.pyplot as plt
import seaborn as sns
warnings.filterwarnings('ignore')
plt.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'SimHei']
plt.rcParams['axes.unicode_minus'] = False
print("环境检查完成!")
print(f"NumPy: {np.__version__}")
print(f"Pandas: {pd.__version__}")
print(f"LightGBM: {lgb.__version__}")2. 数据生成
2.1 生成模拟股票数据
def generate_stock_data(n_stocks=500, n_days=750, seed=42):
"""
生成模拟股票数据
参数:
n_stocks: 股票数量
n_days: 交易日数量(约3年)
seed: 随机种子
返回:
df: 包含日期、股票、特征、收益的 DataFrame
"""
np.random.seed(seed)
# 日期范围
dates = pd.date_range('2021-01-01', periods=n_days, freq='B') # 工作日
stocks = [f'STOCK{i:04d}' for i in range(n_stocks)]
# 创建空 DataFrame
data = []
for date in dates:
for stock in stocks:
data.append({'date': date, 'stock': stock})
df = pd.DataFrame(data)
# 生成基础特征(模拟真实因子)
n = len(df)
# 1. 动量因子:20日收益率
df['momentum_20'] = np.random.randn(n) * 0.1
# 2. 反转因子:5日收益率的负值
df['reversal_5'] = -np.random.randn(n) * 0.05
# 3. 波动率因子:20日波动率
df['volatility_20'] = np.random.rand(n) * 0.5 + 0.1
# 4. 换手率因子
df['turnover'] = np.random.rand(n) * 0.2 + 0.01
# 5. 估值因子:市盈率倒数
df['pe_inverse'] = np.random.randn(n) * 0.02 + 0.05
# 6. 规模因子:市值的对数
df['log_market_cap'] = np.random.randn(n) * 0.5 + 10
# 7. 质量因子:ROE
df['roe'] = np.random.randn(n) * 0.05 + 0.1
# 8. 技术因子:RSI
df['rsi'] = np.random.rand(n) * 100
# 9. 成交量变化
df['volume_change'] = np.random.randn(n) * 0.3
# 10. 行业哑变量(简化为数值)
df['sector'] = np.random.randint(0, 10, n)
# 生成真实收益(只有部分因子有效)
# 真实模型:收益 = 0.3*动量 - 0.2*反转 - 0.15*波动率 + 0.1*ROE + 噪声
true_signal = (
0.30 * df['momentum_20'].values +
-0.20 * df['reversal_5'].values +
-0.15 * (df['volatility_20'].values - 0.3) +
0.10 * (df['roe'].values - 0.1) +
0.05 * df['log_market_cap'].values
)
# 添加噪声和时序变化
time_effect = np.linspace(0, 0.5, n_days)
time_tiled = np.tile(time_effect, n_stocks)
df['return'] = true_signal * 0.5 + np.random.randn(n) * 0.3 + time_tiled * 0.001
# 添加一些缺失值
missing_mask = np.random.rand(n) < 0.02
df.loc[missing_mask, 'turnover'] = np.nan
return df
# 生成数据
df = generate_stock_data(n_stocks=500, n_days=750)
print("数据生成完成!")
print(f"总样本数: {len(df)}")
print(f"时间范围: {df['date'].min()} 到 {df['date'].max()}")
print(f"股票数量: {df['stock'].nunique()}")
print(f"\n数据预览:")
print(df.head(10))2.2 数据探索
# 收益率分布
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
# 收益率直方图
axes[0].hist(df['return'], bins=50, color='steelblue', alpha=0.7, edgecolor='black')
axes[0].axvline(df['return'].mean(), color='red', linestyle='--', linewidth=2, label=f"均值: {df['return'].mean():.4f}")
axes[0].set_xlabel('Return')
axes[0].set_ylabel('Frequency')
axes[0].set_title('Return Distribution')
axes[0].legend()
axes[0].grid(True, alpha=0.3, axis='y')
# 收益率时序图
daily_return = df.groupby('date')['return'].mean()
axes[1].plot(daily_return.index, daily_return.values, color='steelblue', linewidth=1)
axes[1].axhline(0, color='black', linestyle='--', linewidth=0.5)
axes[1].set_xlabel('Date')
axes[1].set_ylabel('Average Return')
axes[1].set_title('Daily Average Return')
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 特征统计
feature_cols = ['momentum_20', 'reversal_5', 'volatility_20', 'turnover',
'pe_inverse', 'log_market_cap', 'roe', 'rsi', 'volume_change']
print("\n特征统计:")
print(df[feature_cols].describe().round(4))3. 特征工程
3.1 特征标准化
def standardize_features(df, feature_cols, method='zscore'):
"""
特征标准化
参数:
df: 数据
feature_cols: 特征列
method: 'zscore' 或 'rank'
返回:
df: 处理后的数据
"""
df = df.copy()
if method == 'zscore':
# Z-score 标准化(按滚动窗口计算,避免前视偏差)
for col in feature_cols:
# 按日期分组计算滚动统计
stats = df.groupby('date')[col].agg(['mean', 'std'])
stats.columns = [f'{col}_mean', f'{col}_std']
df = df.merge(stats, left_on='date', right_index=True, how='left')
df[col] = (df[col] - df[f'{col}_mean']) / (df[f'{col}_std'] + 1e-8)
df = df.drop([f'{col}_mean', f'{col}_std'], axis=1)
elif method == 'rank':
# 排序标准化(更稳健)
for col in feature_cols:
df[col] = df.groupby('date')[col].rank(pct=True)
df[col] = (df[col] - 0.5) * 2 # 映射到 [-1, 1]
return df
# 标准化
df_processed = standardize_features(df, feature_cols, method='zscore')
print("特征标准化完成")
print(df_processed[feature_cols].describe().round(4))3.2 处理缺失值
# 处理缺失值
def handle_missing_values(df, feature_cols):
"""处理缺失值:用当日的中位数填充"""
for col in feature_cols:
daily_median = df.groupby('date')[col].transform('median')
df[col] = df[col].fillna(daily_median)
# 检查是否还有缺失
remaining_nulls = df[feature_cols].isnull().sum()
if remaining_nulls.sum() > 0:
print("警告:仍有缺失值,用全局中位数填充")
df[feature_cols] = df[feature_cols].fillna(df[feature_cols].median())
return df
df_processed = handle_missing_values(df_processed, feature_cols)
print(f"\n缺失值检查: {df_processed[feature_cols].isnull().sum().sum()}")4. 时序划分
4.1 Walk-Forward 划分
def walk_forward_split(df, train_size=1.5, val_size=0.25, test_size=0.25):
"""
Walk-Forward 时序划分
参数:
df: 数据
train_size: 训练集年数
val_size: 验证集年数
test_size: 测试集年数
返回:
splits: 划分列表
"""
df = df.sort_values('date').reset_index(drop=True)
total_days = df['date'].nunique()
train_days = int(train_size * 252)
val_days = int(val_size * 252)
test_days = int(test_size * 252)
splits = []
current_test_start = train_days + val_days
while current_test_start + test_days <= total_days:
train_end = current_test_start - val_days
train_start = max(0, train_end - train_days)
test_end = min(current_test_start + test_days, total_days)
train_dates = df['date'].unique()[train_start:train_end]
val_dates = df['date'].unique()[train_end:current_test_start]
test_dates = df['date'].unique()[current_test_start:test_end]
train_df = df[df['date'].isin(train_dates)]
val_df = df[df['date'].isin(val_dates)]
test_df = df[df['date'].isin(test_dates)]
splits.append({
'train': train_df,
'val': val_df,
'test': test_df,
'dates': (train_dates[0], test_dates[-1])
})
current_test_start += test_days
return splits
# 划分数据
splits = walk_forward_split(df_processed, train_size=1.5, val_size=0.25, test_size=0.25)
print(f"生成 {len(splits)} 个 Walk-Forward 周期")
for i, split in enumerate(splits[:3]):
print(f"\n周期 {i+1}:")
print(f" 训练: {split['dates'][0]} 到 {split['train']['date'].max()} ({len(split['train']['date'].unique())} 天)")
print(f" 验证: {split['val']['date'].min()} 到 {split['val']['date'].max()} ({len(split['val']['date'].unique())} 天)")
print(f" 测试: {split['test']['date'].min()} 到 {split['dates'][1]} ({len(split['test']['date'].unique())} 天)")4.2 使用第一个周期进行初始训练
# 使用第一个划分进行演示
split = splits[0]
train_df = split['train']
val_df = split['val']
test_df = split['test']
X_train = train_df[feature_cols].values
y_train = train_df['return'].values
X_val = val_df[feature_cols].values
y_val = val_df['return'].values
X_test = test_df[feature_cols].values
y_test = test_df['return'].values
print(f"训练集: {X_train.shape[0]} 样本")
print(f"验证集: {X_val.shape[0]} 样本")
print(f"测试集: {X_test.shape[0]} 样本")5. 模型训练与调优
5.1 基础模型训练
# LightGBM 基础训练
def train_lgb_model(X_train, y_train, X_val, y_val, params=None):
"""训练 LightGBM 模型"""
if params is None:
params = {
'objective': 'regression',
'metric': 'rmse',
'max_depth': 6,
'num_leaves': 31,
'learning_rate': 0.05,
'min_child_samples': 50,
'bagging_fraction': 0.8,
'feature_fraction': 0.8,
'bagging_freq': 1,
'lambda_l1': 0.1,
'lambda_l2': 1.0,
'verbose': -1
}
train_data = lgb.Dataset(X_train, label=y_train)
valid_data = lgb.Dataset(X_val, label=y_val, reference=train_data)
model = lgb.train(
params,
train_data,
num_boost_round=2000,
valid_sets=[train_data, valid_data],
callbacks=[
lgb.early_stopping(stopping_rounds=100),
lgb.log_evaluation(period=200)
]
)
return model
# 训练
model = train_lgb_model(X_train, y_train, X_val, y_val)
print(f"\n最佳迭代数: {model.best_iteration}")5.2 Optuna 超参数调优
def objective_lgb(trial, X_train, y_train, X_val, y_val):
"""Optuna 目标函数"""
params = {
'objective': 'regression',
'metric': 'rmse',
'max_depth': trial.suggest_int('max_depth', 3, 10),
'num_leaves': trial.suggest_int('num_leaves', 15, 127),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.2, log=True),
'min_child_samples': trial.suggest_int('min_child_samples', 10, 100),
'bagging_fraction': trial.suggest_float('bagging_fraction', 0.5, 1.0),
'feature_fraction': trial.suggest_float('feature_fraction', 0.5, 1.0),
'lambda_l1': trial.suggest_float('lambda_l1', 0, 1),
'lambda_l2': trial.suggest_float('lambda_l2', 0, 10),
'verbose': -1
}
train_data = lgb.Dataset(X_train, label=y_train)
valid_data = lgb.Dataset(X_val, label=y_val, reference=train_data)
model = lgb.train(
params,
train_data,
num_boost_round=1000,
valid_sets=[valid_data],
callbacks=[lgb.early_stopping(stopping_rounds=50), lgb.log_evaluation(period=False)]
)
y_pred = model.predict(X_val, num_iteration=model.best_iteration)
mse = mean_squared_error(y_val, y_pred)
return mse
# 运行 Optuna(演示用少量 trials)
print("开始超参数优化...")
study = optuna.create_study(direction='minimize')
study.optimize(lambda trial: objective_lgb(trial, X_train, y_train, X_val, y_val),
n_trials=20, show_progress_bar=True)
print(f"\n最佳参数: {study.best_params}")
print(f"最佳验证 MSE: {study.best_value:.4f}")
# 使用最佳参数训练最终模型
best_model = train_lgb_model(X_train, y_train, X_val, y_val, params=study.best_params)6. 模型评估
6.1 IC 分析
def calculate_ic(predictions, returns):
"""计算 IC"""
mask = ~(np.isnan(predictions) | np.isnan(returns))
ic = np.corrcoef(predictions[mask], returns[mask])[0, 1]
return ic
# 预测
y_pred_train = best_model.predict(X_train)
y_pred_val = best_model.predict(X_val)
y_pred_test = best_model.predict(X_test)
# 计算 IC
ic_train = calculate_ic(y_pred_train, y_train)
ic_val = calculate_ic(y_pred_val, y_val)
ic_test = calculate_ic(y_pred_test, y_test)
print("=== IC 分析 ===")
print(f"训练集 IC: {ic_train:.4f}")
print(f"验证集 IC: {ic_val:.4f}")
print(f"测试集 IC: {ic_test:.4f}")
# Rank IC
from scipy.stats import spearmanr
rank_ic_test = spearmanr(y_pred_test, y_test)[0]
print(f"测试集 Rank IC: {rank_ic_test:.4f}")6.2 分层回测
def quantile_backtest(predictions, returns, n_quantiles=5):
"""分层回测"""
data = pd.DataFrame({
'prediction': predictions,
'return': returns
}).dropna()
# 分组
data['group'] = pd.qcut(data['prediction'], n_quantiles, labels=False, duplicates='drop')
# 计算各组收益
group_stats = []
for i in range(n_quantiles):
group_data = data[data['group'] == i]
stats_i = {
'group': i + 1,
'n': len(group_data),
'mean_pred': group_data['prediction'].mean(),
'mean_return': group_data['return'].mean(),
'std_return': group_data['return'].std()
}
group_stats.append(stats_i)
group_df = pd.DataFrame(group_stats)
# 多空收益
long_return = group_df.iloc[-1]['mean_return']
short_return = group_df.iloc[0]['mean_return']
ls_return = long_return - short_return
return group_df, ls_return
# 分层回测
group_df, ls_return = quantile_backtest(y_pred_test, y_test, n_quantiles=5)
print("\n=== 分层回测结果 ===")
print(group_df)
print(f"\n多空收益: {ls_return:.4f}")
# 可视化
fig, ax = plt.subplots(figsize=(10, 5))
colors = ['red', 'orange', 'yellow', 'lightgreen', 'green']
ax.bar(group_df['group'], group_df['mean_return'], color=colors, alpha=0.7)
ax.axhline(0, color='black', linestyle='--', linewidth=0.5)
ax.set_xlabel('Quantile Group')
ax.set_ylabel('Mean Return')
ax.set_title('Quantile Backtest Results')
ax.grid(True, alpha=0.3, axis='y')
# 添加数值标签
for i, row in group_df.iterrows():
ax.text(row['group'], row['mean_return'], f"{row['mean_return']:.4f}",
ha='center', va='bottom' if row['mean_return'] > 0 else 'top')
plt.tight_layout()
plt.show()6.3 滚动 IC 分析
def rolling_ic_analysis(dates, predictions, returns, window=42):
"""滚动 IC 分析(约2个月)"""
ic_series = []
for i in range(window, len(dates)):
pred_window = predictions[i-window:i]
ret_window = returns[i-window:i]
# 计算截面 IC(同一日期不同股票)
# 这里简化为时间窗口内的相关系数
ic = np.corrcoef(pred_window, ret_window)[0, 1]
ic_series.append({'date': dates[i], 'ic': ic})
return pd.DataFrame(ic_series).set_index('date')
# 对测试集进行滚动 IC 分析
test_dates = test_df['date'].unique()
ic_df = rolling_ic_analysis(test_dates, y_pred_test, y_test, window=42)
print("\n=== 滚动 IC 分析 ===")
print(f"IC 均值: {ic_df['ic'].mean():.4f}")
print(f"IC 标准差: {ic_df['ic'].std():.4f}")
print(f"ICIR: {ic_df['ic'].mean() / ic_df['ic'].std():.4f}")
print(f"IC > 0 比例: {(ic_df['ic'] > 0).mean():.2%}")
# 可视化
fig, axes = plt.subplots(1, 2, figsize=(14, 4))
# IC 时间序列
axes[0].plot(ic_df.index, ic_df['ic'], marker='o', linestyle='-', alpha=0.6)
axes[0].axhline(0, color='black', linestyle='--', linewidth=0.5)
axes[0].axhline(ic_df['ic'].mean(), color='red', linestyle='--', label=f'Mean: {ic_df["ic"].mean():.4f}')
axes[0].set_xlabel('Date')
axes[0].set_ylabel('IC')
axes[0].set_title('Rolling IC')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# IC 分布
axes[1].hist(ic_df['ic'], bins=20, color='steelblue', alpha=0.7, edgecolor='black')
axes[1].axvline(ic_df['ic'].mean(), color='red', linestyle='--', linewidth=2, label=f"Mean: {ic_df['ic'].mean():.4f}")
axes[1].axvline(0, color='black', linestyle='--', linewidth=0.5)
axes[1].set_xlabel('IC')
axes[1].set_ylabel('Frequency')
axes[1].set_title('IC Distribution')
axes[1].legend()
axes[1].grid(True, alpha=0.3, axis='y')
plt.tight_layout()
plt.show()7. 特征分析
7.1 特征重要性
# 特征重要性
importance = best_model.feature_importance(importance_type='gain')
importance_df = pd.DataFrame({
'feature': feature_cols,
'importance': importance,
'importance_pct': importance / importance.sum() * 100
}).sort_values('importance', ascending=False)
print("=== 特征重要性 ===")
print(importance_df.head(15))
# 可视化
fig, ax = plt.subplots(figsize=(10, 6))
ax.barh(range(len(importance_df.head(15))),
importance_df.head(15)['importance'],
color='steelblue')
ax.set_yticks(range(len(importance_df.head(15))))
ax.set_yticklabels(importance_df.head(15)['feature'])
ax.set_xlabel('Importance (Gain)')
ax.set_title('Feature Importance (Top 15)')
ax.invert_yaxis()
ax.grid(True, alpha=0.3, axis='x')
plt.tight_layout()
plt.show()7.2 SHAP 分析
# SHAP 分析
explainer = shap.TreeExplainer(best_model)
shap_values = explainer.shap_values(X_test)
# Summary plot
shap.summary_plot(shap_values, X_test, feature_names=feature_cols, show=False)
plt.gcf().set_size_inches(10, 6)
plt.title("SHAP Summary Plot")
plt.tight_layout()
plt.show()
# SHAP 重要性
shap_importance = np.abs(shap_values).mean(axis=0)
shap_df = pd.DataFrame({
'feature': feature_cols,
'shap_importance': shap_importance
}).sort_values('shap_importance', ascending=False)
print("\n=== SHAP 特征重要性 ===")
print(shap_df.head(10))8. 滚动训练
8.1 Walk-Forward 回测
def walk_forward_backtest(splits, feature_cols, params=None):
"""
Walk-Forward 回测
对每个划分周期:
1. 训练模型
2. 在测试集上评估
"""
results = []
for i, split in enumerate(splits):
print(f"\n=== 周期 {i+1}/{len(splits)} ===")
train_df = split['train']
val_df = split['val']
test_df = split['test']
X_train = train_df[feature_cols].values
y_train = train_df['return'].values
X_val = val_df[feature_cols].values
y_val = val_df['return'].values
X_test = test_df[feature_cols].values
y_test = test_df['return'].values
# 训练
model = train_lgb_model(X_train, y_train, X_val, y_val, params=params)
# 预测
y_pred = model.predict(X_test)
# 评估
ic = calculate_ic(y_pred, y_test)
_, ls_return = quantile_backtest(y_pred, y_test, n_quantiles=5)
results.append({
'period': i + 1,
'train_start': split['dates'][0],
'test_end': split['dates'][1],
'best_iteration': model.best_iteration,
'ic': ic,
'long_short_return': ls_return
})
print(f"IC: {ic:.4f}, 多空收益: {ls_return:.4f}")
return pd.DataFrame(results)
# 运行 Walk-Forward 回测(使用前3个周期演示)
wf_results = walk_forward_backtest(splits[:3], feature_cols, params=study.best_params)
# 结果汇总
print("\n=== Walk-Forward 回测结果汇总 ===")
print(wf_results)
print("\n平均表现:")
print(f" 平均 IC: {wf_results['ic'].mean():.4f} ± {wf_results['ic'].std():.4f}")
print(f" 平均多空收益: {wf_results['long_short_return'].mean():.4f} ± {wf_results['long_short_return'].std():.4f}")
print(f" IC > 0 比例: {(wf_results['ic'] > 0).mean():.2%}")9. 项目模板
9.1 完整项目类
class QuantTreeModelProject:
"""量化树模型项目模板"""
def __init__(self, feature_cols, params=None):
"""
初始化项目
参数:
feature_cols: 特征列列表
params: 模型参数字典
"""
self.feature_cols = feature_cols
self.params = params or self._default_params()
self.model = None
self.results = {}
def _default_params(self):
"""默认模型参数"""
return {
'objective': 'regression',
'metric': 'rmse',
'max_depth': 6,
'num_leaves': 31,
'learning_rate': 0.05,
'min_child_samples': 50,
'bagging_fraction': 0.8,
'feature_fraction': 0.8,
'bagging_freq': 1,
'lambda_l1': 0.1,
'lambda_l2': 1.0,
'verbose': -1
}
def prepare_data(self, df):
"""数据预处理"""
df = df.copy()
# 标准化
df = standardize_features(df, self.feature_cols, method='zscore')
# 处理缺失值
df = handle_missing_values(df, self.feature_cols)
return df
def train(self, train_df, val_df):
"""训练模型"""
X_train = train_df[self.feature_cols].values
y_train = train_df['return'].values
X_val = val_df[self.feature_cols].values
y_val = val_df['return'].values
self.model = train_lgb_model(X_train, y_train, X_val, y_val, self.params)
return self.model
def predict(self, df):
"""预测"""
X = df[self.feature_cols].values
return self.model.predict(X)
def evaluate(self, test_df):
"""评估模型"""
y_test = test_df['return'].values
y_pred = self.predict(test_df)
# IC
ic = calculate_ic(y_pred, y_test)
# 分层回测
group_df, ls_return = quantile_backtest(y_pred, y_test, n_quantiles=5)
# 结果
results = {
'ic': ic,
'group_stats': group_df,
'long_short_return': ls_return,
'predictions': y_pred
}
self.results = results
return results
def run_pipeline(self, df):
"""运行完整流程"""
print("=== 开始量化树模型流程 ===\n")
# 1. 数据预处理
print("1. 数据预处理...")
df_processed = self.prepare_data(df)
# 2. 时序划分
print("2. 时序划分...")
splits = walk_forward_split(df_processed)
split = splits[0]
# 3. 训练
print("3. 训练模型...")
self.train(split['train'], split['val'])
# 4. 评估
print("4. 评估模型...")
results = self.evaluate(split['test'])
# 5. 输出结果
print("\n=== 模型评估结果 ===")
print(f"IC: {results['ic']:.4f}")
print(f"多空收益: {results['long_short_return']:.4f}")
return results
# 使用模板
project = QuantTreeModelProject(feature_cols=feature_cols)
results = project.run_pipeline(df)9.2 保存和加载
import joblib
def save_project(project, filepath):
"""保存项目"""
package = {
'model': project.model,
'params': project.params,
'feature_cols': project.feature_cols,
'results': project.results
}
joblib.dump(package, filepath)
print(f"项目已保存到: {filepath}")
def load_project(filepath):
"""加载项目"""
package = joblib.load(filepath)
project = QuantTreeModelProject(
feature_cols=package['feature_cols'],
params=package['params']
)
project.model = package['model']
project.results = package['results']
print(f"项目已从 {filepath} 加载")
return project
# 保存
save_project(project, 'quant_tree_model.pkl')
# 加载
# loaded_project = load_project('quant_tree_model.pkl')核心知识点总结
完整流程回顾
1. 数据生成
├── 模拟多股票×多日数据
├── 生成真实因子
└── 添加噪声和时序效应
2. 特征工程
├── Z-score 标准化
└── 缺失值处理
3. 时序划分
└── Walk-Forward + Purging
4. 模型训练
├── LightGBM 基础训练
└── Optuna 超参数调优
5. 模型评估
├── IC / Rank IC
├── 分层回测
└── 滚动 IC 分析
6. 特征分析
├── 内置重要性
└── SHAP 值分析
7. 滚动训练
└── Walk-Forward 回测验证
8. 项目封装
└── 可复用的项目模板
关键代码片段
| 功能 | 代码 |
|---|---|
| 标准化 | df.groupby('date')[col].transform(lambda x: (x - x.mean()) / x.std()) |
| 时序划分 | Walk-Forward + Purging |
| LightGBM 训练 | lgb.train() with early stopping |
| IC 计算 | np.corrcoef(pred, true)[0, 1] |
| 分组回测 | pd.qcut(predictions, n_quantiles) |
| SHAP 分析 | shap.TreeExplainer(model) |
常见问题
- 数据泄漏: 确保特征工程在时序划分后进行
- 过拟合: 使用早停和正则化
- IC 不稳定: 增加训练数据,简化模型
- 计算效率: 使用 LightGBM 原生 API
恭喜!你已经完成了树模型实战模块的学习。现在你可以:
- 理解梯度提升树的核心原理
- 正确处理量化时序数据
- 高效训练和调优树模型
- 使用量化专用指标评估模型
- 分析特征重要性和稳定性
- 构建完整的量化选股项目
继续探索和实践,不断提升你的量化机器学习技能!