模型训练与调优
目录
1. 基本训练流程
1.1 LightGBM 基础训练
import numpy as np
import pandas as pd
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
# 生成模拟数据
np.random.seed(42)
n_samples = 10000
n_features = 20
# 生成特征
X = np.random.randn(n_samples, n_features)
feature_names = [f'factor_{i}' for i in range(n_features)]
# 生成目标(模拟股票未来收益)
# 只有部分特征有用
true_coefs = np.zeros(n_features)
true_coefs[0] = 0.3 # 动量因子
true_coefs[1] = -0.2 # 反转因子
true_coefs[2] = 0.15 # 波动率因子
true_coefs[5] = 0.1 # 价值因子
y = X @ true_coefs + np.random.randn(n_samples) * 0.5
# 时序划分(模拟)
split_idx = int(0.7 * n_samples)
X_train, X_val, X_test = X[:split_idx], X[split_idx:int(0.85*n_samples)], X[int(0.85*n_samples):]
y_train, y_val, y_test = y[:split_idx], y[split_idx:int(0.85*n_samples)], y[int(0.85*n_samples):]
print(f"训练集: {X_train.shape[0]} 样本")
print(f"验证集: {X_val.shape[0]} 样本")
print(f"测试集: {X_test.shape[0]} 样本")
# 方法1:使用 sklearn API
model = lgb.LGBMRegressor(
n_estimators=1000,
learning_rate=0.05,
max_depth=6,
num_leaves=31,
min_child_samples=20,
subsample=0.8,
colsample_bytree=0.8,
reg_alpha=0.1,
reg_lambda=1.0,
random_state=42,
n_jobs=-1,
verbose=-1
)
model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
callbacks=[
lgb.early_stopping(stopping_rounds=50),
lgb.log_evaluation(period=100)
]
)
# 预测
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
print(f"\n测试集 MSE: {mse:.4f}")
print(f"最佳迭代数: {model.best_iteration_}")1.2 XGBoost 基础训练
import xgboost as xgb
# 创建 DMatrix(XGBoost 的高效数据结构)
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)
dtest = xgb.DMatrix(X_test, label=y_test)
# 参数
params = {
'objective': 'reg:squarederror',
'eval_metric': 'rmse',
'max_depth': 6,
'eta': 0.05,
'subsample': 0.8,
'colsample_bytree': 0.8,
'lambda': 1.0,
'alpha': 0.1,
'min_child_weight': 3,
'seed': 42
}
# 训练
evals_result = {}
bst = xgb.train(
params,
dtrain,
num_boost_round=1000,
evals=[(dtrain, 'train'), (dval, 'val')],
early_stopping_rounds=50,
evals_result=evals_result,
verbose_eval=100
)
# 预测
y_pred_xgb = bst.predict(dtest)
mse_xgb = mean_squared_error(y_test, y_pred_xgb)
print(f"\nXGBoost 测试集 MSE: {mse_xgb:.4f}")
print(f"最佳迭代数: {bst.best_iteration}")1.3 使用原生 API(更高效)
# LightGBM 原生 API
train_data = lgb.Dataset(X_train, label=y_train)
valid_data = lgb.Dataset(X_val, label=y_val, reference=train_data)
params = {
'boosting_type': 'gbdt',
'objective': 'regression',
'metric': 'rmse',
'max_depth': 6,
'num_leaves': 31,
'learning_rate': 0.05,
'feature_fraction': 0.8,
'bagging_fraction': 0.8,
'bagging_freq': 5,
'min_child_samples': 20,
'lambda_l1': 0.1,
'lambda_l2': 1.0,
'verbose': -1
}
gbm = lgb.train(
params,
train_data,
num_boost_round=1000,
valid_sets=[train_data, valid_data],
callbacks=[
lgb.early_stopping(stopping_rounds=50),
lgb.log_evaluation(period=100)
]
)
y_pred_native = gbm.predict(X_test, num_iteration=gbm.best_iteration)
print(f"原生 API 测试集 MSE: {mean_squared_error(y_test, y_pred_native):.4f}")2. 关键超参数详解
2.1 树结构参数
max_depth / num_leaves
控制树的复杂度。
| 参数 | LightGBM | XGBoost | 说明 | 典型值 |
|---|---|---|---|---|
| 树深度 | max_depth | max_depth | 树的最大深度 | 3-8 |
| 叶节点数 | num_leaves | 通过 max_depth 间接控制 | 最大叶节点数 | 15-127 |
重要关系: num_leaves <= 2^max_depth
# 演示深度和叶节点的关系
import matplotlib.pyplot as plt
depths = range(3, 9)
max_leaves = [2**d for d in depths]
fig, ax = plt.subplots(figsize=(10, 4))
ax.bar(depths, max_leaves, color='steelblue', alpha=0.7)
ax.set_xlabel('Max Depth')
ax.set_ylabel('Maximum Leaves')
ax.set_title('Relationship between Depth and Leaves')
ax.set_xticks(depths)
for i, (d, leaves) in enumerate(zip(depths, max_leaves)):
ax.text(d, leaves + 5, str(leaves), ha='center')
plt.tight_layout()
plt.show()
# 推荐:num_leaves = 2^(max_depth-1) 到 2^max_depth 之间
print("推荐配置:")
for d in [4, 5, 6]:
min_leaves = 2**(d-1)
max_leaves = 2**d
print(f" max_depth={d}: num_leaves ∈ [{min_leaves}, {max_leaves}]")min_child_samples / min_child_weight
控制叶节点的最小样本数,防止过拟合。
| 参数 | LightGBM | XGBoost | 说明 | 典型值 |
|---|---|---|---|---|
| 最小样本数 | min_child_samples | - | 叶节点最小样本数 | 10-100 |
| 最小权重和 | - | min_child_weight | 叶节点最小 Hessian 和 | 1-10 |
# 示例:不同 min_child_samples 的影响
min_child_values = [5, 20, 50, 100]
results = []
for mcs in min_child_values:
model = lgb.LGBMRegressor(
n_estimators=200,
max_depth=6,
min_child_samples=mcs,
learning_rate=0.05,
random_state=42,
verbose=-1
)
model.fit(X_train, y_train)
train_mse = mean_squared_error(y_train, model.predict(X_train))
val_mse = mean_squared_error(y_val, model.predict(X_val))
results.append({
'min_child_samples': mcs,
'train_mse': train_mse,
'val_mse': val_mse,
'overfitting': train_mse / val_mse
})
results_df = pd.DataFrame(results)
print("min_child_samples 影响:")
print(results_df)2.2 学习参数
learning_rate / eta
学习率控制每棵树的贡献。
| 学习率 | 效果 | 推荐树数量 | 适用场景 |
|---|---|---|---|
| 0.3-0.5 | 大步前进 | 100-300 | 快速实验 |
| 0.1 | 中等步长 | 500-1000 | 标准 |
| 0.05 | 小步前进 | 1000-3000 | 精细调优 |
| 0.01 | 很小步长 | 3000+ | 最佳精度 |
# 学习率与最佳迭代数的关系
learning_rates = [0.5, 0.1, 0.05, 0.01, 0.005]
print("学习率 | 最佳迭代数 | 训练时间(秒) | 验证MSE")
print("-" * 50)
import time
for lr in learning_rates:
start = time.time()
model = lgb.LGBMRegressor(
learning_rate=lr,
n_estimators=5000,
max_depth=6,
min_child_samples=20,
random_state=42,
verbose=-1
)
model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
callbacks=[lgb.early_stopping(stopping_rounds=50), lgb.log_evaluation(period=False)]
)
elapsed = time.time() - start
val_mse = mean_squared_error(y_val, model.predict(X_val))
print(f"{lr:6.3f} | {model.best_iteration_:4d} | {elapsed:6.2f} | {val_mse:.4f}")n_estimators / num_boost_round
树的数量。配合早停使用,不需要精确设置。
# 早停机制演示
def train_with_early_stopping(early_stopping_rounds):
"""演示早停的影响"""
model = lgb.LGBMRegressor(
n_estimators=5000, # 设置一个很大的数
learning_rate=0.05,
max_depth=6,
random_state=42,
verbose=-1
)
model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
callbacks=[
lgb.early_stopping(stopping_rounds=early_stopping_rounds),
lgb.log_evaluation(period=False)
]
)
return model.best_iteration_
for esr in [10, 30, 50, 100, 200]:
best_iter = train_with_early_stopping(esr)
print(f"早停轮数={esr:3d}: 最佳迭代数={best_iter}")2.3 正则化参数
L1 / L2 正则化
| 参数 | LightGBM | XGBoost | 作用 | 典型值 |
|---|---|---|---|---|
| L1 正则 | reg_alpha / lambda_l1 | alpha | 稀疏化特征权重 | 0-1 |
| L2 正则 | reg_lambda / lambda_l2 | lambda | 防止权重过大 | 0-10 |
# 正则化效果演示
import itertools
alphas = [0, 0.01, 0.1, 1]
lambdas = [0, 0.1, 1, 10]
results_grid = []
for alpha, lambda_ in itertools.product(alphas, lambdas):
model = lgb.LGBMRegressor(
n_estimators=200,
learning_rate=0.05,
max_depth=6,
reg_alpha=alpha,
reg_lambda=lambda_,
random_state=42,
verbose=-1
)
model.fit(X_train, y_train)
val_mse = mean_squared_error(y_val, model.predict(X_val))
results_grid.append({
'alpha': alpha,
'lambda': lambda_,
'val_mse': val_mse
})
results_grid_df = pd.DataFrame(results_grid)
pivot_table = results_grid_df.pivot(index='lambda', columns='alpha', values='val_mse')
print("验证集 MSE (越小越好):")
print(pivot_table.round(4))min_gain_to_split
分裂所需的最小增益,低于此值不再分裂。
# min_gain_to_split 效果
gain_values = [0, 0.01, 0.1, 0.5, 1.0]
for gain in gain_values:
model = lgb.LGBMRegressor(
n_estimators=200,
learning_rate=0.05,
min_gain_to_split=gain,
random_state=42,
verbose=-1
)
model.fit(X_train, y_train)
n_leaves = sum(model.booster_.num_trees())
val_mse = mean_squared_error(y_val, model.predict(X_val))
print(f"min_gain={gain:5.2f}: 叶节点数约={n_leaves:4d}, 验证MSE={val_mse:.4f}")2.4 采样参数
bagging_fraction / subsample
每次迭代随机采样的样本比例。
| 参数 | LightGBM | XGBoost | 典型值 | 作用 |
|---|---|---|---|---|
| 样本采样 | bagging_fraction | subsample | 0.5-0.9 | 降低方差,防过拟合 |
| 特征采样 | feature_fraction | colsample_bytree | 0.5-0.9 | 增加多样性 |
| 采样频率 | bagging_freq | - | 1-5 | 控制采样频率 |
# 采样参数的影响
bagging_fracs = [0.5, 0.7, 0.9, 1.0]
feature_fracs = [0.5, 0.7, 0.9, 1.0]
for bf in bagging_fracs:
for ff in feature_fracs:
model = lgb.LGBMRegressor(
n_estimators=200,
learning_rate=0.05,
bagging_fraction=bf,
feature_fraction=ff,
bagging_freq=1,
random_state=42,
verbose=-1
)
model.fit(X_train, y_train)
val_mse = mean_squared_error(y_val, model.predict(X_val))
print(f"bagging={bf:.1f}, feature={ff:.1f}: val_mse={val_mse:.4f}")3. 超参数调优策略
3.1 网格搜索(Grid Search)
穷举所有参数组合。
from sklearn.model_selection import ParameterGrid
import warnings
warnings.filterwarnings('ignore')
# 定义参数网格
param_grid = {
'max_depth': [4, 6, 8],
'num_leaves': [15, 31, 63],
'learning_rate': [0.05, 0.1],
'min_child_samples': [10, 20, 50]
}
print(f"总共 {len(list(ParameterGrid(param_grid)))} 种组合")
# 网格搜索
best_score = float('inf')
best_params = None
for params in ParameterGrid(param_grid):
model = lgb.LGBMRegressor(
n_estimators=500,
**params,
random_state=42,
verbose=-1
)
model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
callbacks=[lgb.early_stopping(stopping_rounds=30), lgb.log_evaluation(period=False)]
)
val_mse = mean_squared_error(y_val, model.predict(X_val))
if val_mse < best_score:
best_score = val_mse
best_params = params
print(f"\n最佳参数: {best_params}")
print(f"最佳验证 MSE: {best_score:.4f}")3.2 随机搜索
随机采样参数组合,效率更高。
from sklearn.model_selection import ParameterSampler
import scipy.stats as stats
# 定义参数分布
param_distributions = {
'max_depth': stats.randint(3, 10),
'num_leaves': stats.randint(10, 100),
'learning_rate': stats.uniform(0.01, 0.2),
'min_child_samples': stats.randint(5, 100),
'bagging_fraction': stats.uniform(0.5, 0.5),
'feature_fraction': stats.uniform(0.5, 0.5),
'reg_alpha': stats.uniform(0, 1),
'reg_lambda': stats.uniform(0, 10)
}
# 随机采样
n_iter = 50
best_score = float('inf')
best_params = None
for i, params in enumerate(ParameterSampler(param_distributions, n_iter=n_iter, random_state=42)):
model = lgb.LGBMRegressor(
n_estimators=500,
**params,
random_state=42,
verbose=-1
)
model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
callbacks=[lgb.early_stopping(stopping_rounds=30), lgb.log_evaluation(period=False)]
)
val_mse = mean_squared_error(y_val, model.predict(X_val))
if val_mse < best_score:
best_score = val_mse
best_params = params
if (i + 1) % 10 == 0:
print(f"已尝试 {i+1}/{n_iter}, 当前最佳: {best_score:.4f}")
print(f"\n最佳参数: {best_params}")
print(f"最佳验证 MSE: {best_score:.4f}")3.3 贝叶斯优化(Optuna)
智能搜索,利用历史试验结果。
import optuna
# 定义目标函数
def objective(trial):
"""Optuna 目标函数"""
params = {
'max_depth': trial.suggest_int('max_depth', 3, 10),
'num_leaves': trial.suggest_int('num_leaves', 10, 100),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.2, log=True),
'min_child_samples': trial.suggest_int('min_child_samples', 5, 100),
'bagging_fraction': trial.suggest_float('bagging_fraction', 0.5, 1.0),
'feature_fraction': trial.suggest_float('feature_fraction', 0.5, 1.0),
'reg_alpha': trial.suggest_float('reg_alpha', 0, 1),
'reg_lambda': trial.suggest_float('reg_lambda', 0, 10),
'n_estimators': 1000,
'random_state': 42,
'verbose': -1
}
model = lgb.LGBMRegressor(**params)
model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
callbacks=[lgb.early_stopping(stopping_rounds=50), lgb.log_evaluation(period=False)]
)
val_mse = mean_squared_error(y_val, model.predict(X_val))
return val_mse
# 创建研究
study = optuna.create_study(direction='minimize', study_name='lgb_optimization')
# 优化
print("开始贝叶斯优化...")
study.optimize(objective, n_trials=100, show_progress_bar=True)
# 结果
print(f"\n最佳验证 MSE: {study.best_value:.4f}")
print("最佳参数:")
for key, value in study.best_params.items():
print(f" {key}: {value}")
# 可视化优化历史
try:
from optuna.visualization import plot_optimization_history
fig = plot_optimization_history(study)
fig.show()
except:
print("需要安装 optuna-dashboard 进行可视化")
# 特征重要性分析
print("\n参数重要性:")
importance = optuna.importance.get_param_importances(study)
for param, imp in sorted(importance.items(), key=lambda x: x[1], reverse=True):
print(f" {param}: {imp:.3f}")3.4 时序交叉验证调优
# 使用时序 CV 进行调优
from sklearn.model_selection import ParameterGrid
def time_series_cv_score(X, y, params, n_splits=5):
"""
时序交叉验证评分
使用滚动窗口验证
"""
n_samples = len(X)
fold_size = n_samples // (n_splits + 1)
scores = []
for i in range(n_splits):
# 计算训练和测试索引
test_start = (i + 1) * fold_size
test_end = min(test_start + fold_size, n_samples)
train_end = test_start
train_start = max(0, train_end - 3 * fold_size) # 3倍训练窗口
X_train_fold = X[train_start:train_end]
y_train_fold = y[train_start:train_end]
X_test_fold = X[test_start:test_end]
y_test_fold = y[test_start:test_end]
# 训练模型
model = lgb.LGBMRegressor(**params, verbose=-1)
model.fit(
X_train_fold, y_train_fold,
eval_set=[(X_test_fold, y_test_fold)],
callbacks=[lgb.early_stopping(stopping_rounds=30), lgb.log_evaluation(period=False)]
)
# 评分
score = mean_squared_error(y_test_fold, model.predict(X_test_fold))
scores.append(score)
return np.mean(scores)
# 使用 CV 进行调优
param_grid = {
'max_depth': [4, 6],
'num_leaves': [31, 63],
'learning_rate': [0.05, 0.1],
'n_estimators': [500]
}
best_score = float('inf')
best_params = None
for params in ParameterGrid(param_grid):
cv_score = time_series_cv_score(X, y, params, n_splits=5)
print(f"参数: {params}, CV MSE: {cv_score:.4f}")
if cv_score < best_score:
best_score = cv_score
best_params = params
print(f"\n最佳 CV 参数: {best_params}")
print(f"最佳 CV MSE: {best_score:.4f}")4. 量化学术论文参数参考
根据文献总结的常用参数范围:
4.1 树模型参数
| 参数 | 推荐范围 | 论文常用值 | 说明 |
|---|---|---|---|
max_depth | 4-8 | 5, 6 | 量化场景深度不宜过大 |
num_leaves | 15-127 | 31, 63 | 需与 max_depth 配合 |
min_child_samples | 20-100 | 50 | 量化数据噪声大,值要大 |
learning_rate | 0.01-0.1 | 0.05 | 小学习率配合多树 |
4.2 正则化参数
| 参数 | 推荐范围 | 论文常用值 | 说明 |
|---|---|---|---|
reg_alpha | 0-1 | 0, 0.1 | 量化特征通常不需要稀疏化 |
reg_lambda | 0-10 | 1, 5 | 适度正则化 |
min_gain_to_split | 0-0.1 | 0.01 | 防止无意义分裂 |
4.3 采样参数
| 参数 | 推荐范围 | 论文常用值 | 说明 |
|---|---|---|---|
bagging_fraction | 0.7-0.9 | 0.8 | 降低方差 |
feature_fraction | 0.7-0.9 | 0.8 | 增加多样性 |
bagging_freq | 1-5 | 1 | 每次迭代都采样 |
4.4 推荐配置模板
# 保守配置(防过拟合)
conservative_params = {
'max_depth': 5,
'num_leaves': 31,
'min_child_samples': 100,
'learning_rate': 0.02,
'n_estimators': 2000,
'bagging_fraction': 0.7,
'feature_fraction': 0.7,
'reg_alpha': 0.5,
'reg_lambda': 5,
'min_gain_to_split': 0.05
}
# 标准配置(平衡)
standard_params = {
'max_depth': 6,
'num_leaves': 63,
'min_child_samples': 50,
'learning_rate': 0.05,
'n_estimators': 1000,
'bagging_fraction': 0.8,
'feature_fraction': 0.8,
'reg_alpha': 0.1,
'reg_lambda': 1,
'min_gain_to_split': 0.01
}
# 激进配置(追求精度)
aggressive_params = {
'max_depth': 8,
'num_leaves': 127,
'min_child_samples': 20,
'learning_rate': 0.1,
'n_estimators': 500,
'bagging_fraction': 0.9,
'feature_fraction': 0.9,
'reg_alpha': 0,
'reg_lambda': 0.1,
'min_gain_to_split': 0
}
# 根据场景选择
config_map = {
'数据量少(<10k)': conservative_params,
'数据量中等(10k-100k)': standard_params,
'数据量大(>100k)': aggressive_params
}5. 交叉验证集成训练
5.1 K 折模型集成
from sklearn.model_selection import KFold
import warnings
warnings.filterwarnings('ignore')
def cross_validation_ensemble(X, y, X_test, n_folds=5, params=None):
"""
交叉验证集成训练
参数:
X, y: 训练数据
X_test: 测试数据
n_folds: 折数
params: 模型参数
返回:
predictions: 测试集预测(平均)
models: 训练好的模型列表
cv_scores: 各折评分
"""
if params is None:
params = {
'n_estimators': 500,
'learning_rate': 0.05,
'max_depth': 6,
'verbose': -1
}
# 时序交叉验证(不 shuffle)
kf = KFold(n_splits=n_folds, shuffle=False)
predictions = []
models = []
cv_scores = []
for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
print(f"\n=== Fold {fold + 1}/{n_folds} ===")
X_train_fold = X[train_idx]
y_train_fold = y[train_idx]
X_val_fold = X[val_idx]
y_val_fold = y[val_idx]
# 训练
model = lgb.LGBMRegressor(**params, random_state=fold)
model.fit(
X_train_fold, y_train_fold,
eval_set=[(X_val_fold, y_val_fold)],
callbacks=[
lgb.early_stopping(stopping_rounds=50),
lgb.log_evaluation(period=100)
]
)
# 验证集评分
val_pred = model.predict(X_val_fold)
val_mse = mean_squared_error(y_val_fold, val_pred)
cv_scores.append(val_mse)
print(f"验证集 MSE: {val_mse:.4f}")
# 测试集预测
test_pred = model.predict(X_test)
predictions.append(test_pred)
models.append(model)
# 平均预测
ensemble_pred = np.mean(predictions, axis=0)
print(f"\n=== CV 结果 ===")
print(f"各折 MSE: {cv_scores}")
print(f"平均 MSE: {np.mean(cv_scores):.4f} ± {np.std(cv_scores):.4f}")
return ensemble_pred, models, cv_scores
# 使用
ensemble_pred, models, cv_scores = cross_validation_ensemble(
X_train, y_train, X_test,
n_folds=5,
params={'n_estimators': 500, 'learning_rate': 0.05, 'max_depth': 6, 'verbose': -1}
)
print(f"\n集成预测测试集 MSE: {mean_squared_error(y_test, ensemble_pred):.4f}")5.2 时序滚动训练
def rolling_ensemble_train(X, y, X_test, train_size=500, step=100, params=None):
"""
滚动窗口集成训练
模拟定期重新训练模型的生产环境
"""
if params is None:
params = {
'n_estimators': 200,
'learning_rate': 0.05,
'max_depth': 6,
'verbose': -1
}
n_samples = len(X)
predictions = []
models = []
train_periods = []
# 滚动训练
for train_end in range(train_size, n_samples + 1, step):
train_start = max(0, train_end - train_size)
print(f"\n训练期: [{train_start}, {train_end})")
X_train_roll = X[train_start:train_end]
y_train_roll = y[train_start:train_end]
model = lgb.LGBMRegressor(**params, random_state=train_start)
model.fit(X_train_roll, y_train_roll)
pred = model.predict(X_test)
predictions.append(pred)
models.append(model)
train_periods.append((train_start, train_end))
# 平均预测
ensemble_pred = np.mean(predictions, axis=0)
return ensemble_pred, models, train_periods
# 使用
rolling_pred, models, periods = rolling_ensemble_train(
X_train, y_train, X_test,
train_size=500,
step=200
)
print(f"\n滚动集成预测测试集 MSE: {mean_squared_error(y_test, rolling_pred):.4f}")6. 完整调优流水线
6.1 端到端流程
import lightgbm as lgb
import optuna
import numpy as np
import pandas as pd
from sklearn.metrics import mean_squared_error
import warnings
warnings.filterwarnings('ignore')
class TreeModelTrainer:
"""树模型训练器 - 完整的调优流水线"""
def __init__(self, random_state=42):
self.random_state = random_state
self.best_params = None
self.best_model = None
self.training_history = []
def prepare_data(self, X, y, val_size=0.15, test_size=0.15):
"""准备数据(时序划分)"""
n = len(X)
val_split = int(n * (1 - val_size - test_size))
test_split = int(n * (1 - test_size))
self.X_train = X[:val_split]
self.y_train = y[:val_split]
self.X_val = X[val_split:test_split]
self.y_val = y[val_split:test_split]
self.X_test = X[test_split:]
self.y_test = y[test_split:]
print(f"数据划分完成:")
print(f" 训练: {len(self.X_train)}")
print(f" 验证: {len(self.X_val)}")
print(f" 测试: {len(self.X_test)}")
def optimize_params(self, n_trials=100):
"""使用 Optuna 优化参数"""
def objective(trial):
params = {
'max_depth': trial.suggest_int('max_depth', 3, 10),
'num_leaves': trial.suggest_int('num_leaves', 10, 100),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.2, log=True),
'min_child_samples': trial.suggest_int('min_child_samples', 10, 100),
'bagging_fraction': trial.suggest_float('bagging_fraction', 0.5, 1.0),
'feature_fraction': trial.suggest_float('feature_fraction', 0.5, 1.0),
'reg_alpha': trial.suggest_float('reg_alpha', 0, 1),
'reg_lambda': trial.suggest_float('reg_lambda', 0, 10),
'n_estimators': 1000,
'random_state': self.random_state,
'verbose': -1
}
model = lgb.LGBMRegressor(**params)
model.fit(
self.X_train, self.y_train,
eval_set=[(self.X_val, self.y_val)],
callbacks=[lgb.early_stopping(stopping_rounds=50), lgb.log_evaluation(period=False)]
)
val_mse = mean_squared_error(self.y_val, model.predict(self.X_val))
return val_mse
print(f"\n开始参数优化 ({n_trials} trials)...")
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=n_trials, show_progress_bar=True)
self.best_params = study.best_params
print(f"\n最佳参数: {self.best_params}")
print(f"最佳验证 MSE: {study.best_value:.4f}")
return study
def train_final_model(self, params=None):
"""使用最佳参数训练最终模型"""
if params is None:
params = self.best_params
if params is None:
raise ValueError("请先运行 optimize_params 或提供参数")
# 合并训练集和验证集
X_train_full = np.vstack([self.X_train, self.X_val])
y_train_full = np.concatenate([self.y_train, self.y_val])
# 训练
model = lgb.LGBMRegressor(**params)
model.fit(
X_train_full, y_train_full,
eval_set=[(self.X_test, self.y_test)],
callbacks=[
lgb.early_stopping(stopping_rounds=50),
lgb.log_evaluation(period=100)
]
)
self.best_model = model
return model
def evaluate(self):
"""评估最终模型"""
if self.best_model is None:
raise ValueError("请先训练模型")
y_pred_train = self.best_model.predict(self.X_train)
y_pred_val = self.best_model.predict(self.X_val)
y_pred_test = self.best_model.predict(self.X_test)
results = {
'train_mse': mean_squared_error(self.y_train, y_pred_train),
'val_mse': mean_squared_error(self.y_val, y_pred_val),
'test_mse': mean_squared_error(self.y_test, y_pred_test),
'train_ic': np.corrcoef(self.y_train, y_pred_train)[0, 1],
'val_ic': np.corrcoef(self.y_val, y_pred_val)[0, 1],
'test_ic': np.corrcoef(self.y_test, y_pred_test)[0, 1],
}
print("\n=== 模型评估 ===")
for key, value in results.items():
print(f" {key}: {value:.4f}")
return results
def get_feature_importance(self, importance_type='split'):
"""获取特征重要性"""
if self.best_model is None:
raise ValueError("请先训练模型")
importance = self.best_model.booster_.feature_importance(importance_type=importance_type)
feature_names = [f'feature_{i}' for i in range(len(importance))]
importance_df = pd.DataFrame({
'feature': feature_names,
'importance': importance
}).sort_values('importance', ascending=False)
return importance_df
# 使用示例
trainer = TreeModelTrainer(random_state=42)
# 1. 准备数据
trainer.prepare_data(X, y, val_size=0.15, test_size=0.15)
# 2. 优化参数(快速演示用10 trials)
study = trainer.optimize_params(n_trials=10)
# 3. 训练最终模型
final_model = trainer.train_final_model()
# 4. 评估
results = trainer.evaluate()
# 5. 特征重要性
importance_df = trainer.get_feature_importance()
print("\n=== 特征重要性 (Top 10) ===")
print(importance_df.head(10))6.2 模型保存与加载
import joblib
import json
# 保存模型和配置
def save_model_package(model, params, results, filepath):
"""保存完整模型包"""
package = {
'model': model,
'params': params,
'results': results,
'metadata': {
'framework': 'lightgbm',
'version': lgb.__version__
}
}
joblib.dump(package, filepath)
print(f"模型已保存到: {filepath}")
# 加载模型
def load_model_package(filepath):
"""加载模型包"""
package = joblib.load(filepath)
print(f"模型已从 {filepath} 加载")
print(f"框架: {package['metadata']['framework']} {package['metadata']['version']}")
return package
# 使用
save_model_package(
trainer.best_model,
trainer.best_params,
results,
'models/lgb_model.pkl'
)
# 加载
loaded = load_model_package('models/lgb_model.pkl')
loaded_model = loaded['model']核心知识点总结
超参数影响程度
影响度排序(从高到低):
1. n_estimators + early_stopping → 防止过拟合
2. learning_rate → 收敛速度
3. max_depth / num_leaves → 模型复杂度
4. min_child_samples → 叶节点约束
5. feature_fraction → 特征采样
6. bagging_fraction → 样本采样
7. reg_alpha / reg_lambda → 正则化强度
调优策略选择
| 数据量 | 推荐策略 | 预算 |
|---|---|---|
| < 10k | 网格搜索 | 低 |
| 10k-100k | 随机搜索 | 中 |
| > 100k | 贝叶斯优化 | 高 |
推荐工作流
1. 快速实验: 大学习率(0.1) + 少树(200) + 默认参数
2. 粗调: 随机搜索 50-100 组参数
3. 精调: 贝叶斯优化 100-200 trials
4. 最终训练: 合并训练+验证集,小学习率(0.02) + 多树(2000)
下一节: 04-评估指标详解.md - 学习量化模型的专用评估指标。