05-模型训练优化

本节详细介绍深度学习模型的训练技巧,包括训练循环、损失函数、优化器、学习率调度、正则化和早停等关键技术。

完整训练循环

基本结构

训练循环流程:

外层循环 (Epochs)
    │
    ├──→ 遍历批次 (Batches)
    │       │
    │       ├──→ 前向传播: y_pred = model(x)
    │       │
    │       ├──→ 计算损失: loss = criterion(y_pred, y_true)
    │       │
    │       ├──→ 梯度清零: optimizer.zero_grad()
    │       │
    │       ├──→ 反向传播: loss.backward()
    │       │
    │       └──→ 参数更新: optimizer.step()
    │
    ├──→ 验证集评估
    │
    └──→ 保存最佳模型

基础训练循环代码

import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
 
def train_epoch(model, dataloader, criterion, optimizer, device):
    """训练一个 epoch"""
    model.train()
    total_loss = 0
    n_batches = 0
 
    for X_batch, y_batch in dataloader:
        # 移到设备
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)
 
        # 前向传播
        predictions = model(X_batch).squeeze()
        loss = criterion(predictions, y_batch)
 
        # 反向传播
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
 
        total_loss += loss.item()
        n_batches += 1
 
    return total_loss / n_batches
 
def validate(model, dataloader, criterion, device):
    """验证"""
    model.eval()
    total_loss = 0
    n_batches = 0
 
    with torch.no_grad():
        for X_batch, y_batch in dataloader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)
 
            predictions = model(X_batch).squeeze()
            loss = criterion(predictions, y_batch)
 
            total_loss += loss.item()
            n_batches += 1
 
    return total_loss / n_batches
 
def train_model(model, train_loader, val_loader, criterion, optimizer, device, epochs):
    """完整训练函数"""
    train_losses = []
    val_losses = []
 
    for epoch in range(epochs):
        # 训练
        train_loss = train_epoch(model, train_loader, criterion, optimizer, device)
 
        # 验证
        val_loss = validate(model, val_loader, criterion, device)
 
        train_losses.append(train_loss)
        val_losses.append(val_loss)
 
        print(f"Epoch {epoch+1}/{epochs} - Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}")
 
    return train_losses, val_losses
 
# 使用示例
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# model = model.to(device)
# criterion = nn.MSELoss()
# optimizer = optim.Adam(model.parameters(), lr=0.001)
# train_losses, val_losses = train_model(model, train_loader, val_loader, criterion, optimizer, device, epochs=50)

带进度条的版本

from tqdm import tqdm
 
def train_epoch_with_progress(model, dataloader, criterion, optimizer, device):
    """带进度条的训练"""
    model.train()
    total_loss = 0
 
    progress_bar = tqdm(dataloader, desc='Training')
    for X_batch, y_batch in progress_bar:
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)
 
        predictions = model(X_batch).squeeze()
        loss = criterion(predictions, y_batch)
 
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
 
        total_loss += loss.item()
        progress_bar.set_postfix({'loss': f'{loss.item():.6f}'})
 
    return total_loss / len(dataloader)
 
def train_model_with_progress(model, train_loader, val_loader, criterion, optimizer, device, epochs):
    """带完整进度条的版本"""
    train_losses = []
    val_losses = []
 
    for epoch in tqdm(range(epochs), desc='Epochs'):
        train_loss = train_epoch_with_progress(model, train_loader, criterion, optimizer, device)
        val_loss = validate(model, val_loader, criterion, device)
 
        train_losses.append(train_loss)
        val_losses.append(val_loss)
 
        tqdm.write(f"Epoch {epoch+1}/{epochs} - Train: {train_loss:.6f}, Val: {val_loss:.6f}")
 
    return train_losses, val_losses

损失函数选择

常用损失函数对比

损失函数公式梯度特点适用场景
MSE对大误差敏感一般回归
MAE$\frac{1}{n}\sumy-\hat{y}$
Huber平滑过渡兼顾 MSE/MAE鲁棒回归
IC Loss复杂直接优化 IC量化预测

PyTorch 实现

import torch
import torch.nn as nn
 
# 1. MSE Loss
mse_loss = nn.MSELoss()
predictions = torch.tensor([1.5, 2.5, 3.5])
targets = torch.tensor([1.0, 2.0, 3.0])
loss = mse_loss(predictions, targets)
print(f"MSE Loss: {loss.item()}")
 
# 2. MAE (L1) Loss
mae_loss = nn.L1Loss()
loss = mae_loss(predictions, targets)
print(f"MAE Loss: {loss.item()}")
 
# 3. Huber Loss
huber_loss = nn.HuberLoss(delta=1.0)  # delta 是转折点
loss = huber_loss(predictions, targets)
print(f"Huber Loss: {loss.item()}")
 
# 4. Smooth L1 Loss (类似 Huber)
smooth_l1 = nn.SmoothL1Loss()
loss = smooth_l1(predictions, targets)
print(f"Smooth L1 Loss: {loss.item()}")

量化特有:IC-based Loss

IC(Information Coefficient)是预测值与真实值的相关系数,直接优化 IC 更符合量化目标。

class ICLoss(nn.Module):
    """
    IC 损失函数
 
    最小化负 IC,相当于最大化 IC
    """
    def __init__(self, eps=1e-8):
        super(ICLoss, self).__init__()
        self.eps = eps
 
    def forward(self, predictions, targets):
        """
        Args:
            predictions: (batch_size,)
            targets: (batch_size,)
 
        Returns:
            loss: 标量
        """
        # 去均值
        pred_centered = predictions - predictions.mean()
        target_centered = targets - targets.mean()
 
        # 计算协方差和标准差
        covariance = (pred_centered * target_centered).mean()
        pred_std = pred_centered.std(unbiased=False) + self.eps
        target_std = target_centered.std(unbiased=False) + self.eps
 
        # Pearson 相关系数
        ic = covariance / (pred_std * target_std)
 
        # 最小化负 IC
        return -ic.abs()  # 或者 -ic 如果只关心方向
 
# 使用示例
ic_loss = ICLoss()
predictions = torch.tensor([1.5, 2.5, 3.5, 0.5])
targets = torch.tensor([1.0, 2.0, 3.0, 0.0])
loss = ic_loss(predictions, targets)
print(f"IC Loss: {loss.item()}")
 
# 验证:完美预测时 IC=1
predictions_perfect = targets.clone()
loss_perfect = ic_loss(predictions_perfect, targets)
print(f"完美预测 IC Loss: {loss_perfect.item()}")  # 应该接近 -1

Rank IC Loss

对排名相关性进行优化:

class RankICLoss(nn.Module):
    """
    Rank IC 损失函数
 
    使用 Spearman 等级相关系数
    """
    def __init__(self, eps=1e-8):
        super(RankICLoss, self).__init__()
        self.eps = eps
 
    def forward(self, predictions, targets):
        # 计算排名
        pred_rank = torch.argsort(torch.argsort(predictions, descending=True))
        target_rank = torch.argsort(torch.argsort(targets, descending=True))
 
        # 去均值
        pred_rank_centered = pred_rank - pred_rank.mean()
        target_rank_centered = target_rank - target_rank.mean()
 
        # 计算 IC
        covariance = (pred_rank_centered * target_rank_centered).mean()
        pred_std = pred_rank_centered.std(unbiased=False) + self.eps
        target_std = target_rank_centered.std(unbiased=False) + self.eps
 
        rank_ic = covariance / (pred_std * target_std)
 
        return -rank_ic.abs()

自定义损失函数组合

class CombinedLoss(nn.Module):
    """
    组合损失函数
 
    Args:
        alpha: MSE 权重
        beta: IC 权重
    """
    def __init__(self, alpha=1.0, beta=0.1):
        super(CombinedLoss, self).__init__()
        self.alpha = alpha
        self.beta = beta
        self.mse = nn.MSELoss()
        self.ic = ICLoss()
 
    def forward(self, predictions, targets):
        mse_loss = self.mse(predictions, targets)
        ic_loss = self.ic(predictions, targets)
        return self.alpha * mse_loss + self.beta * ic_loss
 
# 使用
loss_fn = CombinedLoss(alpha=1.0, beta=0.1)
loss = loss_fn(predictions, targets)
print(f"Combined Loss: {loss.item()}")

优化器

常用优化器对比

优化器特点超参数适用场景
SGD简单稳定lr, momentum基线模型
Adam自适应学习率lr, betas大多数场景
AdamW带权重衰减lr, betas, weight_decayTransformer
RAdam修正 Adamlr, betas训练初期不稳定

优化器实现

import torch.optim as optim
 
# 创建模型参数
model = nn.Linear(10, 1)
 
# 1. SGD + Momentum
optimizer_sgd = optim.SGD(
    model.parameters(),
    lr=0.01,
    momentum=0.9,      # 动量系数
    weight_decay=1e-4  # L2 正则化
)
 
# 2. Adam(最常用)
optimizer_adam = optim.Adam(
    model.parameters(),
    lr=0.001,
    betas=(0.9, 0.999),  # (动量, RMSprop)
    eps=1e-8,
    weight_decay=0
)
 
# 3. AdamW(带权重衰减的 Adam)
optimizer_adamw = optim.AdamW(
    model.parameters(),
    lr=0.001,
    betas=(0.9, 0.999),
    weight_decay=0.01    # 权重衰减
)
 
# 4. RMSprop
optimizer_rmsprop = optim.RMSprop(
    model.parameters(),
    lr=0.001,
    alpha=0.99,
    eps=1e-8
)

优化器选择建议

选择决策树:

有充足调参时间?
├─ 是 → AdamW(最新实践)
│      ├─ Transformer 模型:weight_decay=0.01
│      └─ LSTM 模型:weight_decay=0.001
└─ 否 → Adam(稳妥选择)
       └─ lr=0.001, 默认参数

数据量特别大?
└─ 考虑 SGD + Momentum(训练更快)

需要强泛化能力?
└─ AdamW + 高 weight_decay

学习率调度

调度器对比

调度器说明适用场景
StepLR固定间隔降低简单场景
ReduceLROnPlateau验证损失停滞时降低最实用
CosineAnnealingLR余弦退火长训练
OneCycleLR单周期学习率快速训练
Warmup + Cosine预热后余弦衰减Transformer

代码实现

import torch.optim.lr_scheduler as scheduler
 
# 1. StepLR:每 step_size 个 epoch 降低 gamma 倍
scheduler_step = scheduler.StepLR(
    optimizer,
    step_size=30,  # 每 30 个 epoch
    gamma=0.1      # 学习率 × 0.1
)
 
# 2. ReduceLROnPlateau:验证损失停滞时降低
scheduler_plateau = scheduler.ReduceLROnPlateau(
    optimizer,
    mode='min',          # 监控指标越小越好
    factor=0.5,          # 新学习率 = 当前 × 0.5
    patience=5,          # 5 个 epoch 不改善才降低
    threshold=1e-4,      # 改善阈值
    min_lr=1e-6          # 最小学习率
)
 
# 使用 ReduceLROnPlateau
val_loss = validate(model, val_loader, criterion, device)
scheduler_plateau.step(val_loss)  # 传入监控指标
 
# 3. CosineAnnealingLR
scheduler_cosine = scheduler.CosineAnnealingLR(
    optimizer,
    T_max=50,      # 半周期(epoch 数)
    eta_min=1e-6   # 最小学习率
)
 
# 4. OneCycleLR
scheduler_onecycle = scheduler.OneCycleLR(
    optimizer,
    max_lr=0.01,        # 最大学习率
    total_steps=1000,   # 总步数
    pct_start=0.3,      # 上升阶段占比
    anneal_strategy='cos'  # 退火策略
)
 
# 5. Warmup + Cosine Decay(手动实现)
class WarmupCosineScheduler:
    """预热 + 余弦退火"""
    def __init__(self, optimizer, warmup_steps, total_steps, min_lr=1e-6):
        self.optimizer = optimizer
        self.warmup_steps = warmup_steps
        self.total_steps = total_steps
        self.min_lr = min_lr
        self.base_lr = optimizer.param_groups[0]['lr']
        self.current_step = 0
 
    def step(self):
        self.current_step += 1
 
        if self.current_step <= self.warmup_steps:
            # 预热阶段:线性增长
            lr = self.base_lr * self.current_step / self.warmup_steps
        else:
            # 余弦退火阶段
            progress = (self.current_step - self.warmup_steps) / (self.total_steps - self.warmup_steps)
            lr = self.min_lr + (self.base_lr - self.min_lr) * 0.5 * (1 + torch.cos(torch.tensor(progress * 3.14159)))
 
        for param_group in self.optimizer.param_groups:
            param_group['lr'] = lr
 
        return lr
 
# 使用
warmup_scheduler = WarmupCosineScheduler(optimizer, warmup_steps=100, total_steps=1000)

正则化技术

Dropout

import torch.nn as nn
 
# Dropout 层
dropout = nn.Dropout(p=0.3)  # 30% 概率丢弃
 
# 在模型中使用
class RegularizedModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, dropout=0.3):
        super().__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.dropout = nn.Dropout(dropout)
        self.fc2 = nn.Linear(hidden_size, output_size)
        self.relu = nn.ReLU()
 
    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)  # 训练时生效
        x = self.fc2(x)
        return x
 
model = RegularizedModel(10, 32, 1, dropout=0.3)

权重衰减

# 通过优化器设置
optimizer = optim.Adam(
    model.parameters(),
    lr=0.001,
    weight_decay=1e-4  # L2 正则化系数
)
 
# 等价于在损失函数中添加 L2 惩罚:
# loss = original_loss + weight_decay * ||w||^2

梯度裁剪

# 裁剪梯度范数
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
 
# 裁剪梯度值
torch.nn.utils.clip_grad_value_(model.parameters(), clip_value=0.5)

BatchNorm / LayerNorm

import torch.nn as nn
 
# BatchNorm1d(全连接层后)
batch_norm = nn.BatchNorm1d(num_features=64)
 
# LayerNorm(RNN/Transformer 中常用)
layer_norm = nn.LayerNorm(normalized_shape=64)
 
# 在模型中使用
class NormModel(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.bn1 = nn.BatchNorm1d(hidden_size)
        self.fc2 = nn.Linear(hidden_size, 1)
 
    def forward(self, x):
        x = self.fc1(x)
        x = self.bn1(x)  # 批量标准化
        x = torch.relu(x)
        x = self.fc2(x)
        return x

早停(Early Stopping)

原理和实现

class EarlyStopping:
    """
    早停机制
 
    当验证损失在 patience 个 epoch 内没有改善时停止训练
    """
    def __init__(self, patience=10, min_delta=0, verbose=True):
        """
        Args:
            patience: 等待改善的 epoch 数
            min_delta: 被认为是改善的最小变化
            verbose: 是否打印信息
        """
        self.patience = patience
        self.min_delta = min_delta
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.best_model_state = None
 
    def __call__(self, val_loss, model):
        score = -val_loss  # 越小越好,所以取负
 
        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(model)
        elif score < self.best_score + self.min_delta:
            self.counter += 1
            if self.verbose:
                print(f'EarlyStopping counter: {self.counter}/{self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(model)
            self.counter = 0
 
    def save_checkpoint(self, model):
        """保存最佳模型"""
        if self.verbose:
            print(f'Validation loss improved. Saving model...')
        self.best_model_state = model.state_dict().copy()
 
    def load_best_model(self, model):
        """加载最佳模型"""
        if self.best_model_state is not None:
            model.load_state_dict(self.best_model_state)
            if self.verbose:
            print('Loaded best model from checkpoint.')
            return model
        return model
 
# 使用示例
early_stopping = EarlyStopping(patience=10, min_delta=1e-6)
 
for epoch in range(epochs):
    train_loss = train_epoch(...)
    val_loss = validate(...)
 
    # 检查早停
    early_stopping(val_loss, model)
    if early_stopping.early_stop:
        print(f'Early stopping triggered at epoch {epoch+1}')
        break
 
# 加载最佳模型
model = early_stopping.load_best_model(model)

模型保存和加载

# 保存最佳模型
def save_model(model, optimizer, epoch, loss, filepath):
    """保存模型检查点"""
    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': loss,
    }, filepath)
 
# 加载模型
def load_model(model, optimizer, filepath):
    """加载模型检查点"""
    checkpoint = torch.load(filepath)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    epoch = checkpoint['epoch']
    loss = checkpoint['loss']
    return model, optimizer, epoch, loss
 
# 使用
save_model(model, optimizer, epoch=50, loss=0.001, filepath='best_model.pth')
model, optimizer, start_epoch, min_loss = load_model(model, optimizer, 'best_model.pth')

训练技巧

混合精度训练

from torch.cuda.amp import autocast, GradScaler
 
# 创建 GradScaler
scaler = GradScaler()
 
def train_epoch_mixed_precision(model, dataloader, criterion, optimizer, device):
    """混合精度训练"""
    model.train()
 
    for X_batch, y_batch in dataloader:
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)
 
        # 使用 autocast 自动混合精度
        with autocast():
            predictions = model(X_batch).squeeze()
            loss = criterion(predictions, y_batch)
 
        # 缩放损失并反向传播
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        optimizer.zero_grad()
 
# 注意:需要 GPU 支持(CUDA >= 7.0, Volta 架构及以上)

梯度累积

def train_epoch_with_accumulation(model, dataloader, criterion, optimizer, device, accumulation_steps=4):
    """梯度累积"""
    model.train()
 
    for i, (X_batch, y_batch) in enumerate(dataloader):
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)
 
        predictions = model(X_batch).squeeze()
        loss = criterion(predictions, y_batch) / accumulation_steps  # 除以累积步数
 
        loss.backward()
 
        # 每 accumulation_steps 步更新一次
        if (i + 1) % accumulation_steps == 0:
            optimizer.step()
            optimizer.zero_grad()
 
# 用途:模拟大 batch size(受内存限制时)

生产级训练流水线

import torch
import torch.nn as nn
import torch.optim as optim
from torch.cuda.amp import autocast, GradScaler
from tqdm import tqdm
import time
import json
 
class Trainer:
    """生产级训练器"""
 
    def __init__(
        self,
        model,
        train_loader,
        val_loader,
        criterion,
        optimizer,
        scheduler=None,
        device='cuda',
        early_stopping_patience=10,
        gradient_clip=1.0,
        mixed_precision=True,
        accumulation_steps=1,
        save_dir='checkpoints'
    ):
        self.model = model.to(device)
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.criterion = criterion
        self.optimizer = optimizer
        self.scheduler = scheduler
        self.device = device
        self.gradient_clip = gradient_clip
        self.mixed_precision = mixed_precision and torch.cuda.is_available()
        self.accumulation_steps = accumulation_steps
        self.save_dir = save_dir
 
        # 创建保存目录
        import os
        os.makedirs(save_dir, exist_ok=True)
 
        # 混合精度
        self.scaler = GradScaler() if self.mixed_precision else None
 
        # 早停
        self.early_stopping = EarlyStopping(patience=early_stopping_patience)
 
        # 训练历史
        self.history = {
            'train_loss': [],
            'val_loss': [],
            'lr': [],
            'epoch_time': []
        }
 
    def train_epoch(self, epoch):
        """训练一个 epoch"""
        self.model.train()
        total_loss = 0
        start_time = time.time()
 
        progress_bar = tqdm(self.train_loader, desc=f'Epoch {epoch}')
 
        for i, (X_batch, y_batch) in enumerate(progress_bar):
            X_batch = X_batch.to(self.device)
            y_batch = y_batch.to(self.device)
 
            # 前向传播
            if self.mixed_precision:
                with autocast():
                    predictions = self.model(X_batch).squeeze()
                    loss = self.criterion(predictions, y_batch) / self.accumulation_steps
            else:
                predictions = self.model(X_batch).squeeze()
                loss = self.criterion(predictions, y_batch) / self.accumulation_steps
 
            # 反向传播
            if self.mixed_precision:
                self.scaler.scale(loss).backward()
            else:
                loss.backward()
 
            # 梯度累积
            if (i + 1) % self.accumulation_steps == 0:
                # 梯度裁剪
                if self.gradient_clip > 0:
                    if self.mixed_precision:
                        self.scaler.unscale_(self.optimizer)
                    torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.gradient_clip)
 
                # 优化器步进
                if self.mixed_precision:
                    self.scaler.step(self.optimizer)
                    self.scaler.update()
                else:
                    self.optimizer.step()
 
                self.optimizer.zero_grad()
 
            total_loss += loss.item() * self.accumulation_steps
            progress_bar.set_postfix({'loss': f'{loss.item()*self.accumulation_steps:.6f}'})
 
        return total_loss / len(self.train_loader)
 
    def validate(self):
        """验证"""
        self.model.eval()
        total_loss = 0
 
        with torch.no_grad():
            for X_batch, y_batch in self.val_loader:
                X_batch = X_batch.to(self.device)
                y_batch = y_batch.to(self.device)
 
                predictions = self.model(X_batch).squeeze()
                loss = self.criterion(predictions, y_batch)
                total_loss += loss.item()
 
        return total_loss / len(self.val_loader)
 
    def train(self, epochs):
        """完整训练"""
        best_val_loss = float('inf')
 
        for epoch in range(1, epochs + 1):
            epoch_start = time.time()
 
            # 训练
            train_loss = self.train_epoch(epoch)
 
            # 验证
            val_loss = self.validate()
 
            # 学习率调度
            current_lr = self.optimizer.param_groups[0]['lr']
            if self.scheduler is not None:
                if isinstance(self.scheduler, optim.lr_scheduler.ReduceLROnPlateau):
                    self.scheduler.step(val_loss)
                else:
                    self.scheduler.step()
 
            # 记录
            epoch_time = time.time() - epoch_start
            self.history['train_loss'].append(train_loss)
            self.history['val_loss'].append(val_loss)
            self.history['lr'].append(current_lr)
            self.history['epoch_time'].append(epoch_time)
 
            # 打印
            tqdm.write(
                f'Epoch {epoch}/{epochs} - '
                f'Train: {train_loss:.6f}, Val: {val_loss:.6f}, '
                f'LR: {current_lr:.2e}, Time: {epoch_time:.1f}s'
            )
 
            # 保存最佳模型
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                self.save_model('best_model.pth')
 
            # 早停检查
            self.early_stopping(val_loss, self.model)
            if self.early_stopping.early_stop:
                print(f'Early stopping at epoch {epoch}')
                break
 
        # 加载最佳模型
        self.model.load_state_dict(torch.load(f'{self.save_dir}/best_model.pth')['model_state_dict'])
 
        # 保存训练历史
        with open(f'{self.save_dir}/history.json', 'w') as f:
            json.dump(self.history, f, indent=2)
 
        return self.history
 
    def save_model(self, filename):
        """保存模型"""
        filepath = f'{self.save_dir}/{filename}'
        torch.save({
            'model_state_dict': self.model.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'history': self.history
        }, filepath)
 
# 使用示例
# trainer = Trainer(
#     model=model,
#     train_loader=train_loader,
#     val_loader=val_loader,
#     criterion=nn.MSELoss(),
#     optimizer=optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01),
#     scheduler=optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5),
#     device='cuda',
#     early_stopping_patience=10,
#     gradient_clip=1.0,
#     mixed_precision=True,
#     accumulation_steps=1
# )
#
# history = trainer.train(epochs=100)

核心知识点总结

1. 训练循环

for epoch in range(epochs):
    for X, y in dataloader:
        y_pred = model(X)
        loss = criterion(y_pred, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

2. 损失函数

# MSE: 对大误差敏感
mse = nn.MSELoss()
 
# MAE: 对异常值鲁棒
mae = nn.L1Loss()
 
# IC Loss: 量化专用
ic = ICLoss()

3. 学习率调度

# ReduceLROnPlateau: 最实用
scheduler = ReduceLROnPlateau(optimizer, mode='min', patience=5)
scheduler.step(val_loss)

4. 正则化

# Dropout
nn.Dropout(0.3)
 
# 权重衰减
optim.Adam(model.parameters(), weight_decay=0.01)
 
# 梯度裁剪
clip_grad_norm_(model.parameters(), max_norm=1.0)

5. 早停

early_stopping = EarlyStopping(patience=10)
early_stopping(val_loss, model)
if early_stopping.early_stop:
    break

练习建议

  1. 实现训练循环:从零实现一个完整的训练器
  2. 对比损失函数:用不同损失函数训练,比较 IC
  3. 调参实验:系统测试不同学习率、dropout、weight_decay

下一节

06-实战案例.md 中,我们将完成一个端到端的量化深度学习项目。