模型训练与调优

目录


1. 基本训练流程

1.1 LightGBM 基础训练

import numpy as np
import pandas as pd
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
 
# 生成模拟数据
np.random.seed(42)
n_samples = 10000
n_features = 20
 
# 生成特征
X = np.random.randn(n_samples, n_features)
feature_names = [f'factor_{i}' for i in range(n_features)]
 
# 生成目标(模拟股票未来收益)
# 只有部分特征有用
true_coefs = np.zeros(n_features)
true_coefs[0] = 0.3   # 动量因子
true_coefs[1] = -0.2  # 反转因子
true_coefs[2] = 0.15  # 波动率因子
true_coefs[5] = 0.1   # 价值因子
 
y = X @ true_coefs + np.random.randn(n_samples) * 0.5
 
# 时序划分(模拟)
split_idx = int(0.7 * n_samples)
X_train, X_val, X_test = X[:split_idx], X[split_idx:int(0.85*n_samples)], X[int(0.85*n_samples):]
y_train, y_val, y_test = y[:split_idx], y[split_idx:int(0.85*n_samples)], y[int(0.85*n_samples):]
 
print(f"训练集: {X_train.shape[0]} 样本")
print(f"验证集: {X_val.shape[0]} 样本")
print(f"测试集: {X_test.shape[0]} 样本")
 
# 方法1:使用 sklearn API
model = lgb.LGBMRegressor(
    n_estimators=1000,
    learning_rate=0.05,
    max_depth=6,
    num_leaves=31,
    min_child_samples=20,
    subsample=0.8,
    colsample_bytree=0.8,
    reg_alpha=0.1,
    reg_lambda=1.0,
    random_state=42,
    n_jobs=-1,
    verbose=-1
)
 
model.fit(
    X_train, y_train,
    eval_set=[(X_val, y_val)],
    callbacks=[
        lgb.early_stopping(stopping_rounds=50),
        lgb.log_evaluation(period=100)
    ]
)
 
# 预测
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
print(f"\n测试集 MSE: {mse:.4f}")
print(f"最佳迭代数: {model.best_iteration_}")

1.2 XGBoost 基础训练

import xgboost as xgb
 
# 创建 DMatrix(XGBoost 的高效数据结构)
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)
dtest = xgb.DMatrix(X_test, label=y_test)
 
# 参数
params = {
    'objective': 'reg:squarederror',
    'eval_metric': 'rmse',
    'max_depth': 6,
    'eta': 0.05,
    'subsample': 0.8,
    'colsample_bytree': 0.8,
    'lambda': 1.0,
    'alpha': 0.1,
    'min_child_weight': 3,
    'seed': 42
}
 
# 训练
evals_result = {}
bst = xgb.train(
    params,
    dtrain,
    num_boost_round=1000,
    evals=[(dtrain, 'train'), (dval, 'val')],
    early_stopping_rounds=50,
    evals_result=evals_result,
    verbose_eval=100
)
 
# 预测
y_pred_xgb = bst.predict(dtest)
mse_xgb = mean_squared_error(y_test, y_pred_xgb)
print(f"\nXGBoost 测试集 MSE: {mse_xgb:.4f}")
print(f"最佳迭代数: {bst.best_iteration}")

1.3 使用原生 API(更高效)

# LightGBM 原生 API
train_data = lgb.Dataset(X_train, label=y_train)
valid_data = lgb.Dataset(X_val, label=y_val, reference=train_data)
 
params = {
    'boosting_type': 'gbdt',
    'objective': 'regression',
    'metric': 'rmse',
    'max_depth': 6,
    'num_leaves': 31,
    'learning_rate': 0.05,
    'feature_fraction': 0.8,
    'bagging_fraction': 0.8,
    'bagging_freq': 5,
    'min_child_samples': 20,
    'lambda_l1': 0.1,
    'lambda_l2': 1.0,
    'verbose': -1
}
 
gbm = lgb.train(
    params,
    train_data,
    num_boost_round=1000,
    valid_sets=[train_data, valid_data],
    callbacks=[
        lgb.early_stopping(stopping_rounds=50),
        lgb.log_evaluation(period=100)
    ]
)
 
y_pred_native = gbm.predict(X_test, num_iteration=gbm.best_iteration)
print(f"原生 API 测试集 MSE: {mean_squared_error(y_test, y_pred_native):.4f}")

2. 关键超参数详解

2.1 树结构参数

max_depth / num_leaves

控制树的复杂度。

参数LightGBMXGBoost说明典型值
树深度max_depthmax_depth树的最大深度3-8
叶节点数num_leaves通过 max_depth 间接控制最大叶节点数15-127

重要关系: num_leaves <= 2^max_depth

# 演示深度和叶节点的关系
import matplotlib.pyplot as plt
 
depths = range(3, 9)
max_leaves = [2**d for d in depths]
 
fig, ax = plt.subplots(figsize=(10, 4))
ax.bar(depths, max_leaves, color='steelblue', alpha=0.7)
ax.set_xlabel('Max Depth')
ax.set_ylabel('Maximum Leaves')
ax.set_title('Relationship between Depth and Leaves')
ax.set_xticks(depths)
for i, (d, leaves) in enumerate(zip(depths, max_leaves)):
    ax.text(d, leaves + 5, str(leaves), ha='center')
plt.tight_layout()
plt.show()
 
# 推荐:num_leaves = 2^(max_depth-1) 到 2^max_depth 之间
print("推荐配置:")
for d in [4, 5, 6]:
    min_leaves = 2**(d-1)
    max_leaves = 2**d
    print(f"  max_depth={d}: num_leaves ∈ [{min_leaves}, {max_leaves}]")

min_child_samples / min_child_weight

控制叶节点的最小样本数,防止过拟合。

参数LightGBMXGBoost说明典型值
最小样本数min_child_samples-叶节点最小样本数10-100
最小权重和-min_child_weight叶节点最小 Hessian 和1-10
# 示例:不同 min_child_samples 的影响
min_child_values = [5, 20, 50, 100]
 
results = []
for mcs in min_child_values:
    model = lgb.LGBMRegressor(
        n_estimators=200,
        max_depth=6,
        min_child_samples=mcs,
        learning_rate=0.05,
        random_state=42,
        verbose=-1
    )
    model.fit(X_train, y_train)
    train_mse = mean_squared_error(y_train, model.predict(X_train))
    val_mse = mean_squared_error(y_val, model.predict(X_val))
 
    results.append({
        'min_child_samples': mcs,
        'train_mse': train_mse,
        'val_mse': val_mse,
        'overfitting': train_mse / val_mse
    })
 
results_df = pd.DataFrame(results)
print("min_child_samples 影响:")
print(results_df)

2.2 学习参数

learning_rate / eta

学习率控制每棵树的贡献。

学习率效果推荐树数量适用场景
0.3-0.5大步前进100-300快速实验
0.1中等步长500-1000标准
0.05小步前进1000-3000精细调优
0.01很小步长3000+最佳精度
# 学习率与最佳迭代数的关系
learning_rates = [0.5, 0.1, 0.05, 0.01, 0.005]
 
print("学习率 | 最佳迭代数 | 训练时间(秒) | 验证MSE")
print("-" * 50)
 
import time
for lr in learning_rates:
    start = time.time()
    model = lgb.LGBMRegressor(
        learning_rate=lr,
        n_estimators=5000,
        max_depth=6,
        min_child_samples=20,
        random_state=42,
        verbose=-1
    )
    model.fit(
        X_train, y_train,
        eval_set=[(X_val, y_val)],
        callbacks=[lgb.early_stopping(stopping_rounds=50), lgb.log_evaluation(period=False)]
    )
    elapsed = time.time() - start
    val_mse = mean_squared_error(y_val, model.predict(X_val))
 
    print(f"{lr:6.3f}  |    {model.best_iteration_:4d}    |   {elapsed:6.2f}   |  {val_mse:.4f}")

n_estimators / num_boost_round

树的数量。配合早停使用,不需要精确设置。

# 早停机制演示
def train_with_early_stopping(early_stopping_rounds):
    """演示早停的影响"""
    model = lgb.LGBMRegressor(
        n_estimators=5000,  # 设置一个很大的数
        learning_rate=0.05,
        max_depth=6,
        random_state=42,
        verbose=-1
    )
 
    model.fit(
        X_train, y_train,
        eval_set=[(X_val, y_val)],
        callbacks=[
            lgb.early_stopping(stopping_rounds=early_stopping_rounds),
            lgb.log_evaluation(period=False)
        ]
    )
 
    return model.best_iteration_
 
for esr in [10, 30, 50, 100, 200]:
    best_iter = train_with_early_stopping(esr)
    print(f"早停轮数={esr:3d}: 最佳迭代数={best_iter}")

2.3 正则化参数

L1 / L2 正则化

参数LightGBMXGBoost作用典型值
L1 正则reg_alpha / lambda_l1alpha稀疏化特征权重0-1
L2 正则reg_lambda / lambda_l2lambda防止权重过大0-10
# 正则化效果演示
import itertools
 
alphas = [0, 0.01, 0.1, 1]
lambdas = [0, 0.1, 1, 10]
 
results_grid = []
for alpha, lambda_ in itertools.product(alphas, lambdas):
    model = lgb.LGBMRegressor(
        n_estimators=200,
        learning_rate=0.05,
        max_depth=6,
        reg_alpha=alpha,
        reg_lambda=lambda_,
        random_state=42,
        verbose=-1
    )
    model.fit(X_train, y_train)
    val_mse = mean_squared_error(y_val, model.predict(X_val))
 
    results_grid.append({
        'alpha': alpha,
        'lambda': lambda_,
        'val_mse': val_mse
    })
 
results_grid_df = pd.DataFrame(results_grid)
pivot_table = results_grid_df.pivot(index='lambda', columns='alpha', values='val_mse')
print("验证集 MSE (越小越好):")
print(pivot_table.round(4))

min_gain_to_split

分裂所需的最小增益,低于此值不再分裂。

# min_gain_to_split 效果
gain_values = [0, 0.01, 0.1, 0.5, 1.0]
 
for gain in gain_values:
    model = lgb.LGBMRegressor(
        n_estimators=200,
        learning_rate=0.05,
        min_gain_to_split=gain,
        random_state=42,
        verbose=-1
    )
    model.fit(X_train, y_train)
    n_leaves = sum(model.booster_.num_trees())
    val_mse = mean_squared_error(y_val, model.predict(X_val))
 
    print(f"min_gain={gain:5.2f}: 叶节点数约={n_leaves:4d}, 验证MSE={val_mse:.4f}")

2.4 采样参数

bagging_fraction / subsample

每次迭代随机采样的样本比例。

参数LightGBMXGBoost典型值作用
样本采样bagging_fractionsubsample0.5-0.9降低方差,防过拟合
特征采样feature_fractioncolsample_bytree0.5-0.9增加多样性
采样频率bagging_freq-1-5控制采样频率
# 采样参数的影响
bagging_fracs = [0.5, 0.7, 0.9, 1.0]
feature_fracs = [0.5, 0.7, 0.9, 1.0]
 
for bf in bagging_fracs:
    for ff in feature_fracs:
        model = lgb.LGBMRegressor(
            n_estimators=200,
            learning_rate=0.05,
            bagging_fraction=bf,
            feature_fraction=ff,
            bagging_freq=1,
            random_state=42,
            verbose=-1
        )
        model.fit(X_train, y_train)
        val_mse = mean_squared_error(y_val, model.predict(X_val))
        print(f"bagging={bf:.1f}, feature={ff:.1f}: val_mse={val_mse:.4f}")

3. 超参数调优策略

穷举所有参数组合。

from sklearn.model_selection import ParameterGrid
import warnings
warnings.filterwarnings('ignore')
 
# 定义参数网格
param_grid = {
    'max_depth': [4, 6, 8],
    'num_leaves': [15, 31, 63],
    'learning_rate': [0.05, 0.1],
    'min_child_samples': [10, 20, 50]
}
 
print(f"总共 {len(list(ParameterGrid(param_grid)))} 种组合")
 
# 网格搜索
best_score = float('inf')
best_params = None
 
for params in ParameterGrid(param_grid):
    model = lgb.LGBMRegressor(
        n_estimators=500,
        **params,
        random_state=42,
        verbose=-1
    )
 
    model.fit(
        X_train, y_train,
        eval_set=[(X_val, y_val)],
        callbacks=[lgb.early_stopping(stopping_rounds=30), lgb.log_evaluation(period=False)]
    )
 
    val_mse = mean_squared_error(y_val, model.predict(X_val))
 
    if val_mse < best_score:
        best_score = val_mse
        best_params = params
 
print(f"\n最佳参数: {best_params}")
print(f"最佳验证 MSE: {best_score:.4f}")

3.2 随机搜索

随机采样参数组合,效率更高。

from sklearn.model_selection import ParameterSampler
import scipy.stats as stats
 
# 定义参数分布
param_distributions = {
    'max_depth': stats.randint(3, 10),
    'num_leaves': stats.randint(10, 100),
    'learning_rate': stats.uniform(0.01, 0.2),
    'min_child_samples': stats.randint(5, 100),
    'bagging_fraction': stats.uniform(0.5, 0.5),
    'feature_fraction': stats.uniform(0.5, 0.5),
    'reg_alpha': stats.uniform(0, 1),
    'reg_lambda': stats.uniform(0, 10)
}
 
# 随机采样
n_iter = 50
best_score = float('inf')
best_params = None
 
for i, params in enumerate(ParameterSampler(param_distributions, n_iter=n_iter, random_state=42)):
    model = lgb.LGBMRegressor(
        n_estimators=500,
        **params,
        random_state=42,
        verbose=-1
    )
 
    model.fit(
        X_train, y_train,
        eval_set=[(X_val, y_val)],
        callbacks=[lgb.early_stopping(stopping_rounds=30), lgb.log_evaluation(period=False)]
    )
 
    val_mse = mean_squared_error(y_val, model.predict(X_val))
 
    if val_mse < best_score:
        best_score = val_mse
        best_params = params
 
    if (i + 1) % 10 == 0:
        print(f"已尝试 {i+1}/{n_iter}, 当前最佳: {best_score:.4f}")
 
print(f"\n最佳参数: {best_params}")
print(f"最佳验证 MSE: {best_score:.4f}")

3.3 贝叶斯优化(Optuna)

智能搜索,利用历史试验结果。

import optuna
 
# 定义目标函数
def objective(trial):
    """Optuna 目标函数"""
    params = {
        'max_depth': trial.suggest_int('max_depth', 3, 10),
        'num_leaves': trial.suggest_int('num_leaves', 10, 100),
        'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.2, log=True),
        'min_child_samples': trial.suggest_int('min_child_samples', 5, 100),
        'bagging_fraction': trial.suggest_float('bagging_fraction', 0.5, 1.0),
        'feature_fraction': trial.suggest_float('feature_fraction', 0.5, 1.0),
        'reg_alpha': trial.suggest_float('reg_alpha', 0, 1),
        'reg_lambda': trial.suggest_float('reg_lambda', 0, 10),
        'n_estimators': 1000,
        'random_state': 42,
        'verbose': -1
    }
 
    model = lgb.LGBMRegressor(**params)
 
    model.fit(
        X_train, y_train,
        eval_set=[(X_val, y_val)],
        callbacks=[lgb.early_stopping(stopping_rounds=50), lgb.log_evaluation(period=False)]
    )
 
    val_mse = mean_squared_error(y_val, model.predict(X_val))
    return val_mse
 
# 创建研究
study = optuna.create_study(direction='minimize', study_name='lgb_optimization')
 
# 优化
print("开始贝叶斯优化...")
study.optimize(objective, n_trials=100, show_progress_bar=True)
 
# 结果
print(f"\n最佳验证 MSE: {study.best_value:.4f}")
print("最佳参数:")
for key, value in study.best_params.items():
    print(f"  {key}: {value}")
 
# 可视化优化历史
try:
    from optuna.visualization import plot_optimization_history
    fig = plot_optimization_history(study)
    fig.show()
except:
    print("需要安装 optuna-dashboard 进行可视化")
 
# 特征重要性分析
print("\n参数重要性:")
importance = optuna.importance.get_param_importances(study)
for param, imp in sorted(importance.items(), key=lambda x: x[1], reverse=True):
    print(f"  {param}: {imp:.3f}")

3.4 时序交叉验证调优

# 使用时序 CV 进行调优
from sklearn.model_selection import ParameterGrid
 
def time_series_cv_score(X, y, params, n_splits=5):
    """
    时序交叉验证评分
 
    使用滚动窗口验证
    """
    n_samples = len(X)
    fold_size = n_samples // (n_splits + 1)
 
    scores = []
 
    for i in range(n_splits):
        # 计算训练和测试索引
        test_start = (i + 1) * fold_size
        test_end = min(test_start + fold_size, n_samples)
        train_end = test_start
        train_start = max(0, train_end - 3 * fold_size)  # 3倍训练窗口
 
        X_train_fold = X[train_start:train_end]
        y_train_fold = y[train_start:train_end]
        X_test_fold = X[test_start:test_end]
        y_test_fold = y[test_start:test_end]
 
        # 训练模型
        model = lgb.LGBMRegressor(**params, verbose=-1)
        model.fit(
            X_train_fold, y_train_fold,
            eval_set=[(X_test_fold, y_test_fold)],
            callbacks=[lgb.early_stopping(stopping_rounds=30), lgb.log_evaluation(period=False)]
        )
 
        # 评分
        score = mean_squared_error(y_test_fold, model.predict(X_test_fold))
        scores.append(score)
 
    return np.mean(scores)
 
# 使用 CV 进行调优
param_grid = {
    'max_depth': [4, 6],
    'num_leaves': [31, 63],
    'learning_rate': [0.05, 0.1],
    'n_estimators': [500]
}
 
best_score = float('inf')
best_params = None
 
for params in ParameterGrid(param_grid):
    cv_score = time_series_cv_score(X, y, params, n_splits=5)
    print(f"参数: {params}, CV MSE: {cv_score:.4f}")
 
    if cv_score < best_score:
        best_score = cv_score
        best_params = params
 
print(f"\n最佳 CV 参数: {best_params}")
print(f"最佳 CV MSE: {best_score:.4f}")

4. 量化学术论文参数参考

根据文献总结的常用参数范围:

4.1 树模型参数

参数推荐范围论文常用值说明
max_depth4-85, 6量化场景深度不宜过大
num_leaves15-12731, 63需与 max_depth 配合
min_child_samples20-10050量化数据噪声大,值要大
learning_rate0.01-0.10.05小学习率配合多树

4.2 正则化参数

参数推荐范围论文常用值说明
reg_alpha0-10, 0.1量化特征通常不需要稀疏化
reg_lambda0-101, 5适度正则化
min_gain_to_split0-0.10.01防止无意义分裂

4.3 采样参数

参数推荐范围论文常用值说明
bagging_fraction0.7-0.90.8降低方差
feature_fraction0.7-0.90.8增加多样性
bagging_freq1-51每次迭代都采样

4.4 推荐配置模板

# 保守配置(防过拟合)
conservative_params = {
    'max_depth': 5,
    'num_leaves': 31,
    'min_child_samples': 100,
    'learning_rate': 0.02,
    'n_estimators': 2000,
    'bagging_fraction': 0.7,
    'feature_fraction': 0.7,
    'reg_alpha': 0.5,
    'reg_lambda': 5,
    'min_gain_to_split': 0.05
}
 
# 标准配置(平衡)
standard_params = {
    'max_depth': 6,
    'num_leaves': 63,
    'min_child_samples': 50,
    'learning_rate': 0.05,
    'n_estimators': 1000,
    'bagging_fraction': 0.8,
    'feature_fraction': 0.8,
    'reg_alpha': 0.1,
    'reg_lambda': 1,
    'min_gain_to_split': 0.01
}
 
# 激进配置(追求精度)
aggressive_params = {
    'max_depth': 8,
    'num_leaves': 127,
    'min_child_samples': 20,
    'learning_rate': 0.1,
    'n_estimators': 500,
    'bagging_fraction': 0.9,
    'feature_fraction': 0.9,
    'reg_alpha': 0,
    'reg_lambda': 0.1,
    'min_gain_to_split': 0
}
 
# 根据场景选择
config_map = {
    '数据量少(<10k)': conservative_params,
    '数据量中等(10k-100k)': standard_params,
    '数据量大(>100k)': aggressive_params
}

5. 交叉验证集成训练

5.1 K 折模型集成

from sklearn.model_selection import KFold
import warnings
warnings.filterwarnings('ignore')
 
def cross_validation_ensemble(X, y, X_test, n_folds=5, params=None):
    """
    交叉验证集成训练
 
    参数:
        X, y: 训练数据
        X_test: 测试数据
        n_folds: 折数
        params: 模型参数
 
    返回:
        predictions: 测试集预测(平均)
        models: 训练好的模型列表
        cv_scores: 各折评分
    """
    if params is None:
        params = {
            'n_estimators': 500,
            'learning_rate': 0.05,
            'max_depth': 6,
            'verbose': -1
        }
 
    # 时序交叉验证(不 shuffle)
    kf = KFold(n_splits=n_folds, shuffle=False)
 
    predictions = []
    models = []
    cv_scores = []
 
    for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
        print(f"\n=== Fold {fold + 1}/{n_folds} ===")
 
        X_train_fold = X[train_idx]
        y_train_fold = y[train_idx]
        X_val_fold = X[val_idx]
        y_val_fold = y[val_idx]
 
        # 训练
        model = lgb.LGBMRegressor(**params, random_state=fold)
        model.fit(
            X_train_fold, y_train_fold,
            eval_set=[(X_val_fold, y_val_fold)],
            callbacks=[
                lgb.early_stopping(stopping_rounds=50),
                lgb.log_evaluation(period=100)
            ]
        )
 
        # 验证集评分
        val_pred = model.predict(X_val_fold)
        val_mse = mean_squared_error(y_val_fold, val_pred)
        cv_scores.append(val_mse)
        print(f"验证集 MSE: {val_mse:.4f}")
 
        # 测试集预测
        test_pred = model.predict(X_test)
        predictions.append(test_pred)
        models.append(model)
 
    # 平均预测
    ensemble_pred = np.mean(predictions, axis=0)
 
    print(f"\n=== CV 结果 ===")
    print(f"各折 MSE: {cv_scores}")
    print(f"平均 MSE: {np.mean(cv_scores):.4f} ± {np.std(cv_scores):.4f}")
 
    return ensemble_pred, models, cv_scores
 
# 使用
ensemble_pred, models, cv_scores = cross_validation_ensemble(
    X_train, y_train, X_test,
    n_folds=5,
    params={'n_estimators': 500, 'learning_rate': 0.05, 'max_depth': 6, 'verbose': -1}
)
 
print(f"\n集成预测测试集 MSE: {mean_squared_error(y_test, ensemble_pred):.4f}")

5.2 时序滚动训练

def rolling_ensemble_train(X, y, X_test, train_size=500, step=100, params=None):
    """
    滚动窗口集成训练
 
    模拟定期重新训练模型的生产环境
    """
    if params is None:
        params = {
            'n_estimators': 200,
            'learning_rate': 0.05,
            'max_depth': 6,
            'verbose': -1
        }
 
    n_samples = len(X)
    predictions = []
    models = []
    train_periods = []
 
    # 滚动训练
    for train_end in range(train_size, n_samples + 1, step):
        train_start = max(0, train_end - train_size)
 
        print(f"\n训练期: [{train_start}, {train_end})")
 
        X_train_roll = X[train_start:train_end]
        y_train_roll = y[train_start:train_end]
 
        model = lgb.LGBMRegressor(**params, random_state=train_start)
        model.fit(X_train_roll, y_train_roll)
 
        pred = model.predict(X_test)
        predictions.append(pred)
        models.append(model)
        train_periods.append((train_start, train_end))
 
    # 平均预测
    ensemble_pred = np.mean(predictions, axis=0)
 
    return ensemble_pred, models, train_periods
 
# 使用
rolling_pred, models, periods = rolling_ensemble_train(
    X_train, y_train, X_test,
    train_size=500,
    step=200
)
 
print(f"\n滚动集成预测测试集 MSE: {mean_squared_error(y_test, rolling_pred):.4f}")

6. 完整调优流水线

6.1 端到端流程

import lightgbm as lgb
import optuna
import numpy as np
import pandas as pd
from sklearn.metrics import mean_squared_error
import warnings
warnings.filterwarnings('ignore')
 
class TreeModelTrainer:
    """树模型训练器 - 完整的调优流水线"""
 
    def __init__(self, random_state=42):
        self.random_state = random_state
        self.best_params = None
        self.best_model = None
        self.training_history = []
 
    def prepare_data(self, X, y, val_size=0.15, test_size=0.15):
        """准备数据(时序划分)"""
        n = len(X)
        val_split = int(n * (1 - val_size - test_size))
        test_split = int(n * (1 - test_size))
 
        self.X_train = X[:val_split]
        self.y_train = y[:val_split]
        self.X_val = X[val_split:test_split]
        self.y_val = y[val_split:test_split]
        self.X_test = X[test_split:]
        self.y_test = y[test_split:]
 
        print(f"数据划分完成:")
        print(f"  训练: {len(self.X_train)}")
        print(f"  验证: {len(self.X_val)}")
        print(f"  测试: {len(self.X_test)}")
 
    def optimize_params(self, n_trials=100):
        """使用 Optuna 优化参数"""
        def objective(trial):
            params = {
                'max_depth': trial.suggest_int('max_depth', 3, 10),
                'num_leaves': trial.suggest_int('num_leaves', 10, 100),
                'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.2, log=True),
                'min_child_samples': trial.suggest_int('min_child_samples', 10, 100),
                'bagging_fraction': trial.suggest_float('bagging_fraction', 0.5, 1.0),
                'feature_fraction': trial.suggest_float('feature_fraction', 0.5, 1.0),
                'reg_alpha': trial.suggest_float('reg_alpha', 0, 1),
                'reg_lambda': trial.suggest_float('reg_lambda', 0, 10),
                'n_estimators': 1000,
                'random_state': self.random_state,
                'verbose': -1
            }
 
            model = lgb.LGBMRegressor(**params)
            model.fit(
                self.X_train, self.y_train,
                eval_set=[(self.X_val, self.y_val)],
                callbacks=[lgb.early_stopping(stopping_rounds=50), lgb.log_evaluation(period=False)]
            )
 
            val_mse = mean_squared_error(self.y_val, model.predict(self.X_val))
            return val_mse
 
        print(f"\n开始参数优化 ({n_trials} trials)...")
        study = optuna.create_study(direction='minimize')
        study.optimize(objective, n_trials=n_trials, show_progress_bar=True)
 
        self.best_params = study.best_params
        print(f"\n最佳参数: {self.best_params}")
        print(f"最佳验证 MSE: {study.best_value:.4f}")
 
        return study
 
    def train_final_model(self, params=None):
        """使用最佳参数训练最终模型"""
        if params is None:
            params = self.best_params
 
        if params is None:
            raise ValueError("请先运行 optimize_params 或提供参数")
 
        # 合并训练集和验证集
        X_train_full = np.vstack([self.X_train, self.X_val])
        y_train_full = np.concatenate([self.y_train, self.y_val])
 
        # 训练
        model = lgb.LGBMRegressor(**params)
        model.fit(
            X_train_full, y_train_full,
            eval_set=[(self.X_test, self.y_test)],
            callbacks=[
                lgb.early_stopping(stopping_rounds=50),
                lgb.log_evaluation(period=100)
            ]
        )
 
        self.best_model = model
        return model
 
    def evaluate(self):
        """评估最终模型"""
        if self.best_model is None:
            raise ValueError("请先训练模型")
 
        y_pred_train = self.best_model.predict(self.X_train)
        y_pred_val = self.best_model.predict(self.X_val)
        y_pred_test = self.best_model.predict(self.X_test)
 
        results = {
            'train_mse': mean_squared_error(self.y_train, y_pred_train),
            'val_mse': mean_squared_error(self.y_val, y_pred_val),
            'test_mse': mean_squared_error(self.y_test, y_pred_test),
            'train_ic': np.corrcoef(self.y_train, y_pred_train)[0, 1],
            'val_ic': np.corrcoef(self.y_val, y_pred_val)[0, 1],
            'test_ic': np.corrcoef(self.y_test, y_pred_test)[0, 1],
        }
 
        print("\n=== 模型评估 ===")
        for key, value in results.items():
            print(f"  {key}: {value:.4f}")
 
        return results
 
    def get_feature_importance(self, importance_type='split'):
        """获取特征重要性"""
        if self.best_model is None:
            raise ValueError("请先训练模型")
 
        importance = self.best_model.booster_.feature_importance(importance_type=importance_type)
        feature_names = [f'feature_{i}' for i in range(len(importance))]
 
        importance_df = pd.DataFrame({
            'feature': feature_names,
            'importance': importance
        }).sort_values('importance', ascending=False)
 
        return importance_df
 
# 使用示例
trainer = TreeModelTrainer(random_state=42)
 
# 1. 准备数据
trainer.prepare_data(X, y, val_size=0.15, test_size=0.15)
 
# 2. 优化参数(快速演示用10 trials)
study = trainer.optimize_params(n_trials=10)
 
# 3. 训练最终模型
final_model = trainer.train_final_model()
 
# 4. 评估
results = trainer.evaluate()
 
# 5. 特征重要性
importance_df = trainer.get_feature_importance()
print("\n=== 特征重要性 (Top 10) ===")
print(importance_df.head(10))

6.2 模型保存与加载

import joblib
import json
 
# 保存模型和配置
def save_model_package(model, params, results, filepath):
    """保存完整模型包"""
    package = {
        'model': model,
        'params': params,
        'results': results,
        'metadata': {
            'framework': 'lightgbm',
            'version': lgb.__version__
        }
    }
    joblib.dump(package, filepath)
    print(f"模型已保存到: {filepath}")
 
# 加载模型
def load_model_package(filepath):
    """加载模型包"""
    package = joblib.load(filepath)
    print(f"模型已从 {filepath} 加载")
    print(f"框架: {package['metadata']['framework']} {package['metadata']['version']}")
    return package
 
# 使用
save_model_package(
    trainer.best_model,
    trainer.best_params,
    results,
    'models/lgb_model.pkl'
)
 
# 加载
loaded = load_model_package('models/lgb_model.pkl')
loaded_model = loaded['model']

核心知识点总结

超参数影响程度

影响度排序(从高到低):
1. n_estimators + early_stopping  → 防止过拟合
2. learning_rate                   → 收敛速度
3. max_depth / num_leaves         → 模型复杂度
4. min_child_samples              → 叶节点约束
5. feature_fraction               → 特征采样
6. bagging_fraction               → 样本采样
7. reg_alpha / reg_lambda         → 正则化强度

调优策略选择

数据量推荐策略预算
< 10k网格搜索
10k-100k随机搜索
> 100k贝叶斯优化

推荐工作流

1. 快速实验: 大学习率(0.1) + 少树(200) + 默认参数
2. 粗调: 随机搜索 50-100 组参数
3. 精调: 贝叶斯优化 100-200 trials
4. 最终训练: 合并训练+验证集,小学习率(0.02) + 多树(2000)

下一节: 04-评估指标详解.md - 学习量化模型的专用评估指标。