模型训练

1. LightGBM训练流程

1.1 基础训练流程

数据准备

import numpy as np
import pandas as pd
import lightgbm as lgb
from sklearn.model_selection import train_test_split
 
# 加载数据
X = np.random.randn(10000, 100)  # 10000个样本,100个特征
y = np.random.randn(10000)  # 目标变量(预测收益率)
 
# 划分训练集和验证集
X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.2, shuffle=False  # 时序数据不要shuffle
)
 
# 创建LightGBM数据集
train_data = lgb.Dataset(X_train, label=y_train)
val_data = lgb.Dataset(X_val, label=y_val, reference=train_data)

参数设置

params = {
    # 基础参数
    'objective': 'regression',
    'metric': 'rmse',
 
    # 模型复杂度
    'num_leaves': 31,
    'max_depth': -1,
    'min_data_in_leaf': 20,
 
    # 学习参数
    'learning_rate': 0.05,
    'n_estimators': 1000,
 
    # 正则化
    'lambda_l1': 0.0,
    'lambda_l2': 0.0,
 
    # 采样
    'bagging_fraction': 0.8,
    'feature_fraction': 0.8,
    'bagging_freq': 5,
 
    # 其他
    'verbosity': -1,
    'n_jobs': -1,
}

训练模型

# 训练
model = lgb.train(
    params,
    train_data,
    num_boost_round=1000,
    valid_sets=[train_data, val_data],
    callbacks=[
        lgb.early_stopping(stopping_rounds=50, verbose=True),
        lgb.log_evaluation(period=100)
    ]
)
 
# 预测
y_pred_train = model.predict(X_train)
y_pred_val = model.predict(X_val)
 
# 评估
from sklearn.metrics import mean_squared_error, r2_score
from scipy.stats import pearsonr, spearmanr
 
train_rmse = np.sqrt(mean_squared_error(y_train, y_pred_train))
val_rmse = np.sqrt(mean_squared_error(y_val, y_pred_val))
 
train_r2 = r2_score(y_train, y_pred_train)
val_r2 = r2_score(y_val, y_pred_val)
 
train_ic = pearsonr(y_train, y_pred_train)[0]
val_ic = pearsonr(y_val, y_pred_val)[0]
 
train_rank_ic = spearmanr(y_train, y_pred_train)[0]
val_rank_ic = spearmanr(y_val, y_pred_val)[0]
 
print(f"Train RMSE: {train_rmse:.4f}, Val RMSE: {val_rmse:.4f}")
print(f"Train R2: {train_r2:.4f}, Val R2: {val_r2:.4f}")
print(f"Train IC: {train_ic:.4f}, Val IC: {val_ic:.4f}")
print(f"Train Rank IC: {train_rank_ic:.4f}, Val Rank IC: {val_rank_ic:.4f}")

1.2 使用sklearn API

优势

  • 与sklearn生态系统无缝集成
  • 支持Pipeline和GridSearchCV
  • 更符合sklearn用户习惯

代码示例

from lightgbm import LGBMRegressor
from sklearn.model_selection import GridSearchCV, TimeSeriesSplit
 
# 使用sklearn API
model = LGBMRegressor(
    objective='regression',
    num_leaves=31,
    learning_rate=0.05,
    n_estimators=1000,
    min_data_in_leaf=20,
    bagging_fraction=0.8,
    feature_fraction=0.8,
    bagging_freq=5,
    verbosity=-1,
    n_jobs=-1,
)
 
# 训练
model.fit(
    X_train, y_train,
    eval_set=[(X_train, y_train), (X_val, y_val)],
    eval_metric='rmse',
    callbacks=[
        lgb.early_stopping(stopping_rounds=50, verbose=False),
        lgb.log_evaluation(period=100)
    ]
)
 
# 预测
y_pred_train = model.predict(X_train)
y_pred_val = model.predict(X_val)

超参数调优

# 定义参数网格
param_grid = {
    'num_leaves': [31, 63, 127],
    'learning_rate': [0.01, 0.05, 0.1],
    'min_data_in_leaf': [10, 20, 50],
    'bagging_fraction': [0.7, 0.8, 0.9],
    'feature_fraction': [0.7, 0.8, 0.9],
}
 
# 时间序列交叉验证
tscv = TimeSeriesSplit(n_splits=5)
 
# 网格搜索
grid_search = GridSearchCV(
    estimator=LGBMRegressor(objective='regression', n_estimators=1000),
    param_grid=param_grid,
    cv=tscv,
    scoring='neg_root_mean_squared_error',
    n_jobs=-1,
    verbose=2
)
 
grid_search.fit(X_train, y_train)
 
print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳分数: {-grid_search.best_score_:.4f}")
 
# 使用最佳模型
best_model = grid_search.best_estimator_

2. 量化场景下的特殊训练策略

2.1 针对IC优化的训练

为什么IC重要?

在量化投资中,我们关注的是预测值和实际值的排序相关性(IC),而非精确的预测误差(RMSE)。

自定义损失函数

def ic_loss(preds, train_data):
    """
    基于IC的损失函数
 
    思路:最大化预测值和真实值的Pearson相关系数
    """
    labels = train_data.get_label()
 
    # 计算IC
    ic = np.corrcoef(preds, labels)[0, 1]
 
    # 梯度:对IC求导
    # IC = Cov(pred, label) / (std(pred) * std(label))
    # dIC/dpred = (label - ic * pred) / (std(pred) * std(label))
 
    std_pred = np.std(preds)
    std_label = np.std(labels)
 
    grad = -(labels - ic * preds) / (std_pred * std_label)
 
    # Hessian:二阶导数(近似)
    hess = np.ones_like(preds)
 
    return grad, hess
 
def ic_metric(preds, train_data):
    """IC评估指标"""
    labels = train_data.get_label()
    ic = np.corrcoef(preds, labels)[0, 1]
    return 'ic', ic, True

使用自定义损失函数

# 定义参数
params = {
    'objective': 'custom',  # 使用自定义目标
    'metric': 'custom',
    'num_leaves': 31,
    'learning_rate': 0.05,
    'min_data_in_leaf': 20,
    'bagging_fraction': 0.8,
    'feature_fraction': 0.8,
    'bagging_freq': 5,
    'verbosity': -1,
}
 
# 训练
model = lgb.train(
    params,
    train_data,
    num_boost_round=1000,
    valid_sets=[train_data, val_data],
    fobj=ic_loss,  # 自定义损失函数
    feval=ic_metric,  # 自定义评估指标
    callbacks=[
        lgb.early_stopping(stopping_rounds=50, verbose=True),
        lgb.log_evaluation(period=100)
    ]
)

Rank IC优化

def rank_ic_loss(preds, train_data):
    """
    基于Rank IC的损失函数
 
    思路:最大化预测值和真实值的Spearman秩相关系数
    """
    labels = train_data.get_label()
 
    # 计算排名
    rank_pred = pd.Series(preds).rank()
    rank_label = pd.Series(labels).rank()
 
    # 计算Rank IC
    rank_ic = np.corrcoef(rank_pred, rank_label)[0, 1]
 
    # 梯度:近似
    # 对排名求导比较复杂,这里使用近似方法
    grad = -(rank_label - rank_ic * rank_pred) / (np.std(rank_pred) * np.std(rank_label))
    hess = np.ones_like(preds)
 
    return grad, hess
 
def rank_ic_metric(preds, train_data):
    """Rank IC评估指标"""
    labels = train_data.get_label()
    rank_pred = pd.Series(preds).rank()
    rank_label = pd.Series(labels).rank()
    rank_ic = np.corrcoef(rank_pred, rank_label)[0, 1]
    return 'rank_ic', rank_ic, True

2.2 分组训练(Group-wise Training)

为什么需要分组?

量化数据通常有层级结构:

  • 时间维度(不同交易日的样本)
  • 股票维度(不同股票的样本)

需要保证同一组内的样本不被分到训练集和验证集。

代码实现

# 假设我们有时间和股票信息
times = np.arange(len(X)) // 21  # 假设每21个交易日为一个时间组
stocks = np.arange(len(X)) % 100  # 假设有100只股票
 
# 创建分组数据集
train_data = lgb.Dataset(X_train, label=y_train, group=times[train_idx])
val_data = lgb.Dataset(X_val, label=y_val, group=times[val_idx], reference=train_data)

Lambda Rank训练

# 使用Lambda Rank(适合排序任务)
params = {
    'objective': 'lambdarank',
    'metric': 'ndcg',
    'num_leaves': 31,
    'learning_rate': 0.05,
    'min_data_in_leaf': 20,
    'verbosity': -1,
}
 
model = lgb.train(
    params,
    train_data,
    num_boost_round=1000,
    valid_sets=[train_data, val_data],
    callbacks=[
        lgb.early_stopping(stopping_rounds=50, verbose=True),
        lgb.log_evaluation(period=100)
    ]
)

2.3 在线学习(Online Learning)

适用场景

  • 数据源源不断产生
  • 需要实时更新模型
  • 市场环境快速变化

增量训练

class OnlineLightGBM:
    """
    在线学习LightGBM
 
    策略:
    1. 初始训练:使用历史数据
    2. 增量更新:定期用新数据更新模型
    3. 窗口控制:保持训练窗口大小
    """
 
    def __init__(self, params, window_size=1000, update_freq=100):
        self.params = params
        self.window_size = window_size
        self.update_freq = update_freq
        self.model = None
        self.train_data = None
 
    def initial_train(self, X, y):
        """初始训练"""
        self.train_data = lgb.Dataset(X, label=y)
        self.model = lgb.train(
            self.params,
            self.train_data,
            num_boost_round=1000,
            callbacks=[
                lgb.early_stopping(stopping_rounds=50, verbose=True),
                lgb.log_evaluation(period=100)
            ]
        )
 
    def update(self, X_new, y_new):
        """增量更新"""
        if self.train_data is None:
            self.initial_train(X_new, y_new)
            return
 
        # 合并新数据
        X_train = self.train_data.data
        y_train = self.train_data.label
 
        X_combined = np.vstack([X_train, X_new])
        y_combined = np.hstack([y_train, y_new])
 
        # 窗口控制
        if len(X_combined) > self.window_size:
            X_combined = X_combined[-self.window_size:]
            y_combined = y_combined[-self.window_size:]
 
        # 重新训练
        self.train_data = lgb.Dataset(X_combined, label=y_combined)
        self.model = lgb.train(
            self.params,
            self.train_data,
            num_boost_round=100,
            init_model=self.model,  # 继续训练
            callbacks=[
                lgb.early_stopping(stopping_rounds=50, verbose=True),
                lgb.log_evaluation(period=100)
            ]
        )

使用示例

# 创建在线学习模型
online_model = OnlineLightGBM(params, window_size=2520, update_freq=21)
 
# 初始训练
X_initial, y_initial = X[:2520], y[:2520]
online_model.initial_train(X_initial, y_initial)
 
# 在线更新
for i in range(2520, len(X), 21):
    X_batch, y_batch = X[i:i+21], y[i:i+21]
    online_model.update(X_batch, y_batch)
 
    # 预测
    y_pred = online_model.model.predict(X[i+21:i+42])

3. 模型训练进阶技巧

3.1 学习率调度

学习率衰减

# 定义学习率衰减函数
def learning_rate_decay(current_iter, total_iter, init_lr=0.1, decay_power=0.99):
    """
    学习率衰减
 
    参数:
        current_iter: 当前迭代次数
        total_iter: 总迭代次数
        init_lr: 初始学习率
        decay_power: 衰减因子
    """
    return init_lr * (decay_power ** current_iter)
 
# 在训练中使用
num_iterations = 1000
callbacks = [
    lgb.early_stopping(stopping_rounds=50, verbose=True),
    lgb.log_evaluation(period=100),
    lgb.reset_parameter(
        learning_rate=lambda iter: learning_rate_decay(iter, num_iterations, init_lr=0.1, decay_power=0.99)
    )
]
 
model = lgb.train(
    params,
    train_data,
    num_boost_round=num_iterations,
    valid_sets=[train_data, val_data],
    callbacks=callbacks
)

余弦退火

def cosine_annealing_lr(current_iter, total_iter, init_lr=0.1, min_lr=0.001):
    """
    余弦退火学习率
    """
    cosine = (1 + np.cos(np.pi * current_iter / total_iter)) / 2
    return min_lr + (init_lr - min_lr) * cosine
 
callbacks = [
    lgb.early_stopping(stopping_rounds=50, verbose=True),
    lgb.log_evaluation(period=100),
    lgb.reset_parameter(
        learning_rate=lambda iter: cosine_annealing_lr(iter, num_iterations, init_lr=0.1, min_lr=0.001)
    )
]

3.2 特征采样策略

动态特征采样

class DynamicFeatureSampler:
    """
    动态特征采样器
 
    策略:
    1. 早期:使用所有特征,快速学习
    2. 中期:根据特征重要性采样
    3. 后期:只使用重要特征,精细调优
    """
 
    def __init__(self, n_features, importance=None):
        self.n_features = n_features
        self.importance = importance
        self.stage = 'early'
 
    def get_feature_fraction(self, iteration, total_iterations):
        """
        根据迭代阶段返回特征采样比例
        """
        early_ratio = iteration / total_iterations
 
        if early_ratio < 0.3:
            # 早期:使用100%特征
            return 1.0
        elif early_ratio < 0.7:
            # 中期:根据重要性采样
            return 0.8
        else:
            # 后期:只使用重要特征
            return 0.5
 
# 使用动态特征采样
sampler = DynamicFeatureSampler(X.shape[1])
callbacks = [
    lgb.early_stopping(stopping_rounds=50, verbose=True),
    lgb.log_evaluation(period=100),
    lgb.reset_parameter(
        feature_fraction=lambda iter: sampler.get_feature_fraction(iter, num_iterations)
    )
]

3.3 类别不平衡处理

权重调整

# 计算正负样本比例
pos_samples = np.sum(y > 0)
neg_samples = np.sum(y <= 0)
scale_pos_weight = neg_samples / pos_samples
 
print(f"正样本: {pos_samples}, 负样本: {neg_samples}")
print(f"权重比例: {scale_pos_weight:.2f}")
 
# 设置参数
params = {
    'objective': 'regression',
    'metric': 'rmse',
    'scale_pos_weight': scale_pos_weight,  # 正负样本权重
    'num_leaves': 31,
    'learning_rate': 0.05,
    'min_data_in_leaf': 20,
    'verbosity': -1,
}

自定义样本权重

# 计算样本权重
sample_weights = np.ones_like(y)
 
# 对难预测样本给予更高权重
train_residuals = y_train - model.predict(X_train)
residual_std = np.std(train_residuals)
 
# 对残差大的样本给予更高权重
sample_weights = 1 + np.abs(train_residuals) / residual_std
 
# 创建带权重的数据集
train_data_weighted = lgb.Dataset(X_train, label=y_train, weight=sample_weights)
 
# 训练
model = lgb.train(
    params,
    train_data_weighted,
    num_boost_round=1000,
    valid_sets=[train_data_weighted, val_data],
    callbacks=[
        lgb.early_stopping(stopping_rounds=50, verbose=True),
        lgb.log_evaluation(period=100)
    ]
)

3.4 早停机制详解

早停 (Early Stopping) 的作用:

训练曲线示意:

MSE ↑
    │   训练集
    │     ╲
    │      ╲─────────────→  继续下降 (记忆数据)
    │       验证集
    │         ╲
    │          ╲___          最佳点
    │              ╲_______↗
    │                 ╱ 过拟合开始!
    │               ╱
    └────────────────────────→ 迭代次数

工作原理:

  1. 每轮计算验证集 MSE
  2. 如果连续 N 轮没有改进 → 停止训练
  3. 返回最佳迭代的模型

代码:

lgb.early_stopping(stopping_rounds=30)  # 30轮无改进则停止

完整示例:

# 训练模型,带早停
model = lgb.train(
    params,
    train_data,
    num_boost_round=500,                # 最多500棵树
    valid_sets=[train_data, val_data],
    valid_names=['train', 'valid'],
    callbacks=[
        lgb.early_stopping(stopping_rounds=30),  # 30轮无改进则停止
        lgb.log_evaluation(period=50)            # 每50轮打印
    ]
)
 
# 示例输出:
# Training until validation scores don't improve for 30 rounds
# [50]    train's l2: 0.00347577    valid's l2: 0.00422996
# Early stopping, best iteration is:
# [57]    train's l2: 0.00345507    valid's l2: 0.00422585
 
print(f"✅ 训练完成!")
print(f"   训练时间: 0:00:00.584752")
print(f"   最佳迭代: {model.best_iteration}")
print(f"   树的数量: {model.num_trees()}")

3.5 预测与简单评估

# 预测
y_pred_train = model.predict(X_train)
y_pred_valid = model.predict(X_valid)
y_pred_test = model.predict(X_test)
 
# 简单评估
from sklearn.metrics import mean_squared_error, r2_score
 
mse_train = mean_squared_error(y_train, y_pred_train)
mse_valid = mean_squared_error(y_valid, y_pred_valid)
mse_test = mean_squared_error(y_test, y_pred_test)
 
print(f"MSE 评估:")
print(f"  训练集: {mse_train:.6f}")
print(f"  验证集: {mse_valid:.6f}")
print(f"  测试集: {mse_test:.6f}")
 
# 检查过拟合
if mse_train < mse_valid * 0.5:
    print(f"⚠️ 警告: 训练集 MSE 远低于验证集,可能过拟合!")
else:
    print(f"✅ 过拟合检查通过")

3.6 早停策略优化

多层早停

class MultiLevelEarlyStopping:
    """
    多层早停策略
 
    策略:
    1. 训练集指标:监控过拟合
    2. 验证集IC:监控预测能力
    3. IC衰减:监控性能衰减
    """
 
    def __init__(self, stopping_rounds=100, min_delta=0.001):
        self.stopping_rounds = stopping_rounds
        self.min_delta = min_delta
        self.best_ic = -np.inf
        self.best_round = 0
        self.ic_history = []
 
    def __call__(self, env):
        """
        回调函数
        """
        # 获取验证集IC
        val_data = env.validation_data
        y_pred = env.model.predict(val_data.data)
        y_true = val_data.label
        ic = np.corrcoef(y_pred, y_true)[0, 1]
 
        self.ic_history.append(ic)
 
        # 检查是否提升
        if ic > self.best_ic + self.min_delta:
            self.best_ic = ic
            self.best_round = env.iteration
        elif env.iteration - self.best_round >= self.stopping_rounds:
            # 早停
            raise StopIteration
 
        print(f"Round {env.iteration}: IC = {ic:.4f}, Best IC = {self.best_ic:.4f}")
 
# 使用多层早停
callbacks = [
    lgb.log_evaluation(period=100),
    MultiLevelEarlyStopping(stopping_rounds=100, min_delta=0.001)
]

4. 分布式训练

4.1 多GPU训练

# 使用多GPU训练
params = {
    'objective': 'regression',
    'metric': 'rmse',
    'num_leaves': 31,
    'learning_rate': 0.05,
    'device': 'gpu',  # 使用GPU
    'gpu_platform_id': 0,
    'gpu_device_id': 0,
    'num_gpu': 2,  # 使用2个GPU
    'verbosity': -1,
}
 
model = lgb.train(
    params,
    train_data,
    num_boost_round=1000,
    valid_sets=[train_data, val_data],
    callbacks=[
        lgb.early_stopping(stopping_rounds=50, verbose=True),
        lgb.log_evaluation(period=100)
    ]
)

4.2 多机训练

# 假设有多个机器,每台机器处理一部分数据
 
# 主机(master)
params = {
    'objective': 'regression',
    'metric': 'rmse',
    'num_leaves': 31,
    'learning_rate': 0.05,
    'tree_learner': 'data_parallel',  # 数据并行
    'num_machines': 4,  # 4台机器
    'machines': '192.168.1.1:12345,192.168.1.2:12345,192.168.1.3:12345,192.168.1.4:12345',
    'verbosity': -1,
}
 
model = lgb.train(
    params,
    train_data,
    num_boost_round=1000,
    valid_sets=[train_data, val_data],
    callbacks=[
        lgb.early_stopping(stopping_rounds=50, verbose=True),
        lgb.log_evaluation(period=100)
    ]
)

5. 模型保存与加载

5.1 保存模型

# 保存模型
model.save_model('lightgbm_model.txt')
 
# 保存为JSON格式(更易读)
model.save_model('lightgbm_model.json')
 
# 保存模型和数据
import joblib
joblib.dump(model, 'lightgbm_model.pkl')

5.2 加载模型

# 加载模型
model = lgb.Booster(model_file='lightgbm_model.txt')
 
# 加载并继续训练
model = lgb.Booster(model_file='lightgbm_model.txt')
model = lgb.train(
    params,
    train_data,
    num_boost_round=100,
    init_model=model,  # 继续训练
    valid_sets=[train_data, val_data],
    callbacks=[
        lgb.early_stopping(stopping_rounds=50, verbose=True),
        lgb.log_evaluation(period=100)
    ]
)

5.3 模型版本管理

import os
import json
from datetime import datetime
 
class ModelVersioning:
    """
    模型版本管理
 
    功能:
    1. 保存模型及其元数据
    2. 版本控制
    3. 模型比较
    """
 
    def __init__(self, model_dir='models'):
        self.model_dir = model_dir
        os.makedirs(model_dir, exist_ok=True)
 
    def save_model(self, model, metadata):
        """
        保存模型及元数据
 
        metadata: {
            'train_ic': 0.05,
            'val_ic': 0.03,
            'params': {...},
            'train_date': '2024-01-01',
            ...
        }
        """
        version = datetime.now().strftime('%Y%m%d_%H%M%S')
        model_name = f"model_{version}"
 
        # 保存模型
        model_path = os.path.join(self.model_dir, f"{model_name}.txt")
        model.save_model(model_path)
 
        # 保存元数据
        metadata_path = os.path.join(self.model_dir, f"{model_name}_metadata.json")
        with open(metadata_path, 'w') as f:
            json.dump(metadata, f, indent=2)
 
        print(f"模型已保存: {model_name}")
 
        return model_name
 
    def load_model(self, model_name):
        """加载模型"""
        model_path = os.path.join(self.model_dir, f"{model_name}.txt")
        metadata_path = os.path.join(self.model_dir, f"{model_name}_metadata.json")
 
        # 加载模型
        model = lgb.Booster(model_file=model_path)
 
        # 加载元数据
        with open(metadata_path, 'r') as f:
            metadata = json.load(f)
 
        return model, metadata
 
    def list_models(self):
        """列出所有模型"""
        models = []
        for file in os.listdir(self.model_dir):
            if file.endswith('.txt'):
                model_name = file.replace('.txt', '')
                metadata_path = os.path.join(self.model_dir, f"{model_name}_metadata.json")
 
                if os.path.exists(metadata_path):
                    with open(metadata_path, 'r') as f:
                        metadata = json.load(f)
                    models.append((model_name, metadata))
 
        return models

使用示例

# 创建版本管理器
version_manager = ModelVersioning()
 
# 保存模型
metadata = {
    'train_ic': train_ic,
    'val_ic': val_ic,
    'params': params,
    'train_date': datetime.now().strftime('%Y-%m-%d'),
    'train_samples': len(X_train),
    'val_samples': len(X_val),
}
 
model_name = version_manager.save_model(model, metadata)
 
# 列出所有模型
models = version_manager.list_models()
for name, meta in models:
    print(f"{name}: IC={meta['val_ic']:.4f}, Date={meta['train_date']}")

6. 总结

LightGBM模型训练在量化场景中需要特别关注:

  1. 训练流程:数据准备、参数设置、模型训练、评估
  2. 特殊训练策略:针对IC优化、分组训练、在线学习
  3. 进阶技巧:学习率调度、特征采样、类别不平衡处理
  4. 分布式训练:多GPU、多机训练
  5. 模型管理:保存、加载、版本控制

正确的训练策略是提升模型性能的关键,需要根据具体场景灵活调整。