模型训练
1. LightGBM训练流程
1.1 基础训练流程
数据准备
import numpy as np
import pandas as pd
import lightgbm as lgb
from sklearn.model_selection import train_test_split
# 加载数据
X = np.random.randn(10000, 100) # 10000个样本,100个特征
y = np.random.randn(10000) # 目标变量(预测收益率)
# 划分训练集和验证集
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, shuffle=False # 时序数据不要shuffle
)
# 创建LightGBM数据集
train_data = lgb.Dataset(X_train, label=y_train)
val_data = lgb.Dataset(X_val, label=y_val, reference=train_data)参数设置
params = {
# 基础参数
'objective': 'regression',
'metric': 'rmse',
# 模型复杂度
'num_leaves': 31,
'max_depth': -1,
'min_data_in_leaf': 20,
# 学习参数
'learning_rate': 0.05,
'n_estimators': 1000,
# 正则化
'lambda_l1': 0.0,
'lambda_l2': 0.0,
# 采样
'bagging_fraction': 0.8,
'feature_fraction': 0.8,
'bagging_freq': 5,
# 其他
'verbosity': -1,
'n_jobs': -1,
}训练模型
# 训练
model = lgb.train(
params,
train_data,
num_boost_round=1000,
valid_sets=[train_data, val_data],
callbacks=[
lgb.early_stopping(stopping_rounds=50, verbose=True),
lgb.log_evaluation(period=100)
]
)
# 预测
y_pred_train = model.predict(X_train)
y_pred_val = model.predict(X_val)
# 评估
from sklearn.metrics import mean_squared_error, r2_score
from scipy.stats import pearsonr, spearmanr
train_rmse = np.sqrt(mean_squared_error(y_train, y_pred_train))
val_rmse = np.sqrt(mean_squared_error(y_val, y_pred_val))
train_r2 = r2_score(y_train, y_pred_train)
val_r2 = r2_score(y_val, y_pred_val)
train_ic = pearsonr(y_train, y_pred_train)[0]
val_ic = pearsonr(y_val, y_pred_val)[0]
train_rank_ic = spearmanr(y_train, y_pred_train)[0]
val_rank_ic = spearmanr(y_val, y_pred_val)[0]
print(f"Train RMSE: {train_rmse:.4f}, Val RMSE: {val_rmse:.4f}")
print(f"Train R2: {train_r2:.4f}, Val R2: {val_r2:.4f}")
print(f"Train IC: {train_ic:.4f}, Val IC: {val_ic:.4f}")
print(f"Train Rank IC: {train_rank_ic:.4f}, Val Rank IC: {val_rank_ic:.4f}")1.2 使用sklearn API
优势
- 与sklearn生态系统无缝集成
- 支持Pipeline和GridSearchCV
- 更符合sklearn用户习惯
代码示例
from lightgbm import LGBMRegressor
from sklearn.model_selection import GridSearchCV, TimeSeriesSplit
# 使用sklearn API
model = LGBMRegressor(
objective='regression',
num_leaves=31,
learning_rate=0.05,
n_estimators=1000,
min_data_in_leaf=20,
bagging_fraction=0.8,
feature_fraction=0.8,
bagging_freq=5,
verbosity=-1,
n_jobs=-1,
)
# 训练
model.fit(
X_train, y_train,
eval_set=[(X_train, y_train), (X_val, y_val)],
eval_metric='rmse',
callbacks=[
lgb.early_stopping(stopping_rounds=50, verbose=False),
lgb.log_evaluation(period=100)
]
)
# 预测
y_pred_train = model.predict(X_train)
y_pred_val = model.predict(X_val)超参数调优
# 定义参数网格
param_grid = {
'num_leaves': [31, 63, 127],
'learning_rate': [0.01, 0.05, 0.1],
'min_data_in_leaf': [10, 20, 50],
'bagging_fraction': [0.7, 0.8, 0.9],
'feature_fraction': [0.7, 0.8, 0.9],
}
# 时间序列交叉验证
tscv = TimeSeriesSplit(n_splits=5)
# 网格搜索
grid_search = GridSearchCV(
estimator=LGBMRegressor(objective='regression', n_estimators=1000),
param_grid=param_grid,
cv=tscv,
scoring='neg_root_mean_squared_error',
n_jobs=-1,
verbose=2
)
grid_search.fit(X_train, y_train)
print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳分数: {-grid_search.best_score_:.4f}")
# 使用最佳模型
best_model = grid_search.best_estimator_2. 量化场景下的特殊训练策略
2.1 针对IC优化的训练
为什么IC重要?
在量化投资中,我们关注的是预测值和实际值的排序相关性(IC),而非精确的预测误差(RMSE)。
自定义损失函数
def ic_loss(preds, train_data):
"""
基于IC的损失函数
思路:最大化预测值和真实值的Pearson相关系数
"""
labels = train_data.get_label()
# 计算IC
ic = np.corrcoef(preds, labels)[0, 1]
# 梯度:对IC求导
# IC = Cov(pred, label) / (std(pred) * std(label))
# dIC/dpred = (label - ic * pred) / (std(pred) * std(label))
std_pred = np.std(preds)
std_label = np.std(labels)
grad = -(labels - ic * preds) / (std_pred * std_label)
# Hessian:二阶导数(近似)
hess = np.ones_like(preds)
return grad, hess
def ic_metric(preds, train_data):
"""IC评估指标"""
labels = train_data.get_label()
ic = np.corrcoef(preds, labels)[0, 1]
return 'ic', ic, True使用自定义损失函数
# 定义参数
params = {
'objective': 'custom', # 使用自定义目标
'metric': 'custom',
'num_leaves': 31,
'learning_rate': 0.05,
'min_data_in_leaf': 20,
'bagging_fraction': 0.8,
'feature_fraction': 0.8,
'bagging_freq': 5,
'verbosity': -1,
}
# 训练
model = lgb.train(
params,
train_data,
num_boost_round=1000,
valid_sets=[train_data, val_data],
fobj=ic_loss, # 自定义损失函数
feval=ic_metric, # 自定义评估指标
callbacks=[
lgb.early_stopping(stopping_rounds=50, verbose=True),
lgb.log_evaluation(period=100)
]
)Rank IC优化
def rank_ic_loss(preds, train_data):
"""
基于Rank IC的损失函数
思路:最大化预测值和真实值的Spearman秩相关系数
"""
labels = train_data.get_label()
# 计算排名
rank_pred = pd.Series(preds).rank()
rank_label = pd.Series(labels).rank()
# 计算Rank IC
rank_ic = np.corrcoef(rank_pred, rank_label)[0, 1]
# 梯度:近似
# 对排名求导比较复杂,这里使用近似方法
grad = -(rank_label - rank_ic * rank_pred) / (np.std(rank_pred) * np.std(rank_label))
hess = np.ones_like(preds)
return grad, hess
def rank_ic_metric(preds, train_data):
"""Rank IC评估指标"""
labels = train_data.get_label()
rank_pred = pd.Series(preds).rank()
rank_label = pd.Series(labels).rank()
rank_ic = np.corrcoef(rank_pred, rank_label)[0, 1]
return 'rank_ic', rank_ic, True2.2 分组训练(Group-wise Training)
为什么需要分组?
量化数据通常有层级结构:
- 时间维度(不同交易日的样本)
- 股票维度(不同股票的样本)
需要保证同一组内的样本不被分到训练集和验证集。
代码实现
# 假设我们有时间和股票信息
times = np.arange(len(X)) // 21 # 假设每21个交易日为一个时间组
stocks = np.arange(len(X)) % 100 # 假设有100只股票
# 创建分组数据集
train_data = lgb.Dataset(X_train, label=y_train, group=times[train_idx])
val_data = lgb.Dataset(X_val, label=y_val, group=times[val_idx], reference=train_data)Lambda Rank训练
# 使用Lambda Rank(适合排序任务)
params = {
'objective': 'lambdarank',
'metric': 'ndcg',
'num_leaves': 31,
'learning_rate': 0.05,
'min_data_in_leaf': 20,
'verbosity': -1,
}
model = lgb.train(
params,
train_data,
num_boost_round=1000,
valid_sets=[train_data, val_data],
callbacks=[
lgb.early_stopping(stopping_rounds=50, verbose=True),
lgb.log_evaluation(period=100)
]
)2.3 在线学习(Online Learning)
适用场景
- 数据源源不断产生
- 需要实时更新模型
- 市场环境快速变化
增量训练
class OnlineLightGBM:
"""
在线学习LightGBM
策略:
1. 初始训练:使用历史数据
2. 增量更新:定期用新数据更新模型
3. 窗口控制:保持训练窗口大小
"""
def __init__(self, params, window_size=1000, update_freq=100):
self.params = params
self.window_size = window_size
self.update_freq = update_freq
self.model = None
self.train_data = None
def initial_train(self, X, y):
"""初始训练"""
self.train_data = lgb.Dataset(X, label=y)
self.model = lgb.train(
self.params,
self.train_data,
num_boost_round=1000,
callbacks=[
lgb.early_stopping(stopping_rounds=50, verbose=True),
lgb.log_evaluation(period=100)
]
)
def update(self, X_new, y_new):
"""增量更新"""
if self.train_data is None:
self.initial_train(X_new, y_new)
return
# 合并新数据
X_train = self.train_data.data
y_train = self.train_data.label
X_combined = np.vstack([X_train, X_new])
y_combined = np.hstack([y_train, y_new])
# 窗口控制
if len(X_combined) > self.window_size:
X_combined = X_combined[-self.window_size:]
y_combined = y_combined[-self.window_size:]
# 重新训练
self.train_data = lgb.Dataset(X_combined, label=y_combined)
self.model = lgb.train(
self.params,
self.train_data,
num_boost_round=100,
init_model=self.model, # 继续训练
callbacks=[
lgb.early_stopping(stopping_rounds=50, verbose=True),
lgb.log_evaluation(period=100)
]
)使用示例
# 创建在线学习模型
online_model = OnlineLightGBM(params, window_size=2520, update_freq=21)
# 初始训练
X_initial, y_initial = X[:2520], y[:2520]
online_model.initial_train(X_initial, y_initial)
# 在线更新
for i in range(2520, len(X), 21):
X_batch, y_batch = X[i:i+21], y[i:i+21]
online_model.update(X_batch, y_batch)
# 预测
y_pred = online_model.model.predict(X[i+21:i+42])3. 模型训练进阶技巧
3.1 学习率调度
学习率衰减
# 定义学习率衰减函数
def learning_rate_decay(current_iter, total_iter, init_lr=0.1, decay_power=0.99):
"""
学习率衰减
参数:
current_iter: 当前迭代次数
total_iter: 总迭代次数
init_lr: 初始学习率
decay_power: 衰减因子
"""
return init_lr * (decay_power ** current_iter)
# 在训练中使用
num_iterations = 1000
callbacks = [
lgb.early_stopping(stopping_rounds=50, verbose=True),
lgb.log_evaluation(period=100),
lgb.reset_parameter(
learning_rate=lambda iter: learning_rate_decay(iter, num_iterations, init_lr=0.1, decay_power=0.99)
)
]
model = lgb.train(
params,
train_data,
num_boost_round=num_iterations,
valid_sets=[train_data, val_data],
callbacks=callbacks
)余弦退火
def cosine_annealing_lr(current_iter, total_iter, init_lr=0.1, min_lr=0.001):
"""
余弦退火学习率
"""
cosine = (1 + np.cos(np.pi * current_iter / total_iter)) / 2
return min_lr + (init_lr - min_lr) * cosine
callbacks = [
lgb.early_stopping(stopping_rounds=50, verbose=True),
lgb.log_evaluation(period=100),
lgb.reset_parameter(
learning_rate=lambda iter: cosine_annealing_lr(iter, num_iterations, init_lr=0.1, min_lr=0.001)
)
]3.2 特征采样策略
动态特征采样
class DynamicFeatureSampler:
"""
动态特征采样器
策略:
1. 早期:使用所有特征,快速学习
2. 中期:根据特征重要性采样
3. 后期:只使用重要特征,精细调优
"""
def __init__(self, n_features, importance=None):
self.n_features = n_features
self.importance = importance
self.stage = 'early'
def get_feature_fraction(self, iteration, total_iterations):
"""
根据迭代阶段返回特征采样比例
"""
early_ratio = iteration / total_iterations
if early_ratio < 0.3:
# 早期:使用100%特征
return 1.0
elif early_ratio < 0.7:
# 中期:根据重要性采样
return 0.8
else:
# 后期:只使用重要特征
return 0.5
# 使用动态特征采样
sampler = DynamicFeatureSampler(X.shape[1])
callbacks = [
lgb.early_stopping(stopping_rounds=50, verbose=True),
lgb.log_evaluation(period=100),
lgb.reset_parameter(
feature_fraction=lambda iter: sampler.get_feature_fraction(iter, num_iterations)
)
]3.3 类别不平衡处理
权重调整
# 计算正负样本比例
pos_samples = np.sum(y > 0)
neg_samples = np.sum(y <= 0)
scale_pos_weight = neg_samples / pos_samples
print(f"正样本: {pos_samples}, 负样本: {neg_samples}")
print(f"权重比例: {scale_pos_weight:.2f}")
# 设置参数
params = {
'objective': 'regression',
'metric': 'rmse',
'scale_pos_weight': scale_pos_weight, # 正负样本权重
'num_leaves': 31,
'learning_rate': 0.05,
'min_data_in_leaf': 20,
'verbosity': -1,
}自定义样本权重
# 计算样本权重
sample_weights = np.ones_like(y)
# 对难预测样本给予更高权重
train_residuals = y_train - model.predict(X_train)
residual_std = np.std(train_residuals)
# 对残差大的样本给予更高权重
sample_weights = 1 + np.abs(train_residuals) / residual_std
# 创建带权重的数据集
train_data_weighted = lgb.Dataset(X_train, label=y_train, weight=sample_weights)
# 训练
model = lgb.train(
params,
train_data_weighted,
num_boost_round=1000,
valid_sets=[train_data_weighted, val_data],
callbacks=[
lgb.early_stopping(stopping_rounds=50, verbose=True),
lgb.log_evaluation(period=100)
]
)3.4 早停机制详解
早停 (Early Stopping) 的作用:
训练曲线示意:
MSE ↑
│ 训练集
│ ╲
│ ╲─────────────→ 继续下降 (记忆数据)
│ 验证集
│ ╲
│ ╲___ 最佳点
│ ╲_______↗
│ ╱ 过拟合开始!
│ ╱
└────────────────────────→ 迭代次数
工作原理:
- 每轮计算验证集 MSE
- 如果连续 N 轮没有改进 → 停止训练
- 返回最佳迭代的模型
代码:
lgb.early_stopping(stopping_rounds=30) # 30轮无改进则停止完整示例:
# 训练模型,带早停
model = lgb.train(
params,
train_data,
num_boost_round=500, # 最多500棵树
valid_sets=[train_data, val_data],
valid_names=['train', 'valid'],
callbacks=[
lgb.early_stopping(stopping_rounds=30), # 30轮无改进则停止
lgb.log_evaluation(period=50) # 每50轮打印
]
)
# 示例输出:
# Training until validation scores don't improve for 30 rounds
# [50] train's l2: 0.00347577 valid's l2: 0.00422996
# Early stopping, best iteration is:
# [57] train's l2: 0.00345507 valid's l2: 0.00422585
print(f"✅ 训练完成!")
print(f" 训练时间: 0:00:00.584752")
print(f" 最佳迭代: {model.best_iteration}")
print(f" 树的数量: {model.num_trees()}")3.5 预测与简单评估
# 预测
y_pred_train = model.predict(X_train)
y_pred_valid = model.predict(X_valid)
y_pred_test = model.predict(X_test)
# 简单评估
from sklearn.metrics import mean_squared_error, r2_score
mse_train = mean_squared_error(y_train, y_pred_train)
mse_valid = mean_squared_error(y_valid, y_pred_valid)
mse_test = mean_squared_error(y_test, y_pred_test)
print(f"MSE 评估:")
print(f" 训练集: {mse_train:.6f}")
print(f" 验证集: {mse_valid:.6f}")
print(f" 测试集: {mse_test:.6f}")
# 检查过拟合
if mse_train < mse_valid * 0.5:
print(f"⚠️ 警告: 训练集 MSE 远低于验证集,可能过拟合!")
else:
print(f"✅ 过拟合检查通过")3.6 早停策略优化
多层早停
class MultiLevelEarlyStopping:
"""
多层早停策略
策略:
1. 训练集指标:监控过拟合
2. 验证集IC:监控预测能力
3. IC衰减:监控性能衰减
"""
def __init__(self, stopping_rounds=100, min_delta=0.001):
self.stopping_rounds = stopping_rounds
self.min_delta = min_delta
self.best_ic = -np.inf
self.best_round = 0
self.ic_history = []
def __call__(self, env):
"""
回调函数
"""
# 获取验证集IC
val_data = env.validation_data
y_pred = env.model.predict(val_data.data)
y_true = val_data.label
ic = np.corrcoef(y_pred, y_true)[0, 1]
self.ic_history.append(ic)
# 检查是否提升
if ic > self.best_ic + self.min_delta:
self.best_ic = ic
self.best_round = env.iteration
elif env.iteration - self.best_round >= self.stopping_rounds:
# 早停
raise StopIteration
print(f"Round {env.iteration}: IC = {ic:.4f}, Best IC = {self.best_ic:.4f}")
# 使用多层早停
callbacks = [
lgb.log_evaluation(period=100),
MultiLevelEarlyStopping(stopping_rounds=100, min_delta=0.001)
]4. 分布式训练
4.1 多GPU训练
# 使用多GPU训练
params = {
'objective': 'regression',
'metric': 'rmse',
'num_leaves': 31,
'learning_rate': 0.05,
'device': 'gpu', # 使用GPU
'gpu_platform_id': 0,
'gpu_device_id': 0,
'num_gpu': 2, # 使用2个GPU
'verbosity': -1,
}
model = lgb.train(
params,
train_data,
num_boost_round=1000,
valid_sets=[train_data, val_data],
callbacks=[
lgb.early_stopping(stopping_rounds=50, verbose=True),
lgb.log_evaluation(period=100)
]
)4.2 多机训练
# 假设有多个机器,每台机器处理一部分数据
# 主机(master)
params = {
'objective': 'regression',
'metric': 'rmse',
'num_leaves': 31,
'learning_rate': 0.05,
'tree_learner': 'data_parallel', # 数据并行
'num_machines': 4, # 4台机器
'machines': '192.168.1.1:12345,192.168.1.2:12345,192.168.1.3:12345,192.168.1.4:12345',
'verbosity': -1,
}
model = lgb.train(
params,
train_data,
num_boost_round=1000,
valid_sets=[train_data, val_data],
callbacks=[
lgb.early_stopping(stopping_rounds=50, verbose=True),
lgb.log_evaluation(period=100)
]
)5. 模型保存与加载
5.1 保存模型
# 保存模型
model.save_model('lightgbm_model.txt')
# 保存为JSON格式(更易读)
model.save_model('lightgbm_model.json')
# 保存模型和数据
import joblib
joblib.dump(model, 'lightgbm_model.pkl')5.2 加载模型
# 加载模型
model = lgb.Booster(model_file='lightgbm_model.txt')
# 加载并继续训练
model = lgb.Booster(model_file='lightgbm_model.txt')
model = lgb.train(
params,
train_data,
num_boost_round=100,
init_model=model, # 继续训练
valid_sets=[train_data, val_data],
callbacks=[
lgb.early_stopping(stopping_rounds=50, verbose=True),
lgb.log_evaluation(period=100)
]
)5.3 模型版本管理
import os
import json
from datetime import datetime
class ModelVersioning:
"""
模型版本管理
功能:
1. 保存模型及其元数据
2. 版本控制
3. 模型比较
"""
def __init__(self, model_dir='models'):
self.model_dir = model_dir
os.makedirs(model_dir, exist_ok=True)
def save_model(self, model, metadata):
"""
保存模型及元数据
metadata: {
'train_ic': 0.05,
'val_ic': 0.03,
'params': {...},
'train_date': '2024-01-01',
...
}
"""
version = datetime.now().strftime('%Y%m%d_%H%M%S')
model_name = f"model_{version}"
# 保存模型
model_path = os.path.join(self.model_dir, f"{model_name}.txt")
model.save_model(model_path)
# 保存元数据
metadata_path = os.path.join(self.model_dir, f"{model_name}_metadata.json")
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=2)
print(f"模型已保存: {model_name}")
return model_name
def load_model(self, model_name):
"""加载模型"""
model_path = os.path.join(self.model_dir, f"{model_name}.txt")
metadata_path = os.path.join(self.model_dir, f"{model_name}_metadata.json")
# 加载模型
model = lgb.Booster(model_file=model_path)
# 加载元数据
with open(metadata_path, 'r') as f:
metadata = json.load(f)
return model, metadata
def list_models(self):
"""列出所有模型"""
models = []
for file in os.listdir(self.model_dir):
if file.endswith('.txt'):
model_name = file.replace('.txt', '')
metadata_path = os.path.join(self.model_dir, f"{model_name}_metadata.json")
if os.path.exists(metadata_path):
with open(metadata_path, 'r') as f:
metadata = json.load(f)
models.append((model_name, metadata))
return models使用示例
# 创建版本管理器
version_manager = ModelVersioning()
# 保存模型
metadata = {
'train_ic': train_ic,
'val_ic': val_ic,
'params': params,
'train_date': datetime.now().strftime('%Y-%m-%d'),
'train_samples': len(X_train),
'val_samples': len(X_val),
}
model_name = version_manager.save_model(model, metadata)
# 列出所有模型
models = version_manager.list_models()
for name, meta in models:
print(f"{name}: IC={meta['val_ic']:.4f}, Date={meta['train_date']}")6. 总结
LightGBM模型训练在量化场景中需要特别关注:
- 训练流程:数据准备、参数设置、模型训练、评估
- 特殊训练策略:针对IC优化、分组训练、在线学习
- 进阶技巧:学习率调度、特征采样、类别不平衡处理
- 分布式训练:多GPU、多机训练
- 模型管理:保存、加载、版本控制
正确的训练策略是提升模型性能的关键,需要根据具体场景灵活调整。