强化学习基础与实战

从 MDP 理论到 PPO 交易智能体,系统掌握 RL 在量化交易中的应用


Part 1:强化学习核心理论

1.1 强化学习 vs 监督学习

强化学习(Reinforcement Learning, RL)与监督学习有本质区别,理解这些区别是正确应用 RL 的前提。

维度监督学习强化学习
数据来源静态标注数据集智能体与环境交互生成
反馈信号即时、确定(标签)延迟、稀疏(奖励)
目标最小化预测误差最大化累积奖励
数据分布固定 i.i.d.非平稳,依赖策略
探索机制无需探索必须平衡探索与利用

在量化交易中,监督学习(如 LSTM 预测涨跌)告诉你”明天涨还是跌”,而强化学习告诉你”什么时候买、买多少、什么时候卖”。RL 直接优化的是交易策略的累计收益,而非中间预测步骤。

# 监督学习:预测标签
# X = [特征], y = [涨/跌]  →  训练 f(X) ≈ y
 
# 强化学习:学习策略
# 状态 s_t = 市场观测  →  动作 a_t = 买/卖/持有  →  奖励 r_t = 收益
# 目标:最大化 Σ γ^t * r_t

1.2 马尔可夫决策过程(MDP)

强化学习的数学框架是马尔可夫决策过程,由四元组定义:

符号名称量化交易对应
S状态空间(State)持仓、持仓盈亏、技术指标序列、资金比例
A动作空间(Action)买入/卖出/持有、仓位比例(连续动作)
R奖励函数(Reward)PnL、夏普比率、风险调整收益
P状态转移概率(Transition)市场价格演化(未知/近似)
γ折扣因子(Discount)衡量对远期收益的重视程度

马尔可夫性假设:当前状态包含了做出最优决策所需的全部信息。在交易中,这意味着我们假设当前的市场观测(价格、指标、持仓)足以决定下一步动作,无需历史路径。

"""
MDP 在量化交易中的映射
 
状态 s_t:
    - 价格特征: [close_t, return_1d, return_5d, volume_ratio]
    - 技术指标: [RSI_14, MACD, Bollinger_position]
    - 账户状态: [position, unrealized_pnl, cash_ratio]
 
动作 a_t:
    - 离散: {买入, 卖出, 持有}
    - 连续: [-1, 1] 表示目标仓位比例
 
奖励 r_t:
    - 简单: r_t = position_t * return_{t+1}
    - 风险调整: r_t = position_t * return_{t+1} - λ * σ²(position_returns)
"""

1.3 Bellman 方程

Bellman 方程是 RL 的基石,它将当前状态的价值分解为即时奖励与未来价值的期望。

状态价值函数 V(s)

V(s) = max_a [ R(s, a) + γ * Σ P(s'|s,a) * V(s') ]

动作价值函数 Q(s, a)

Q(s, a) = R(s, a) + γ * Σ P(s'|s,a) * max_a' Q(s', a')

Bellman 最优性原理的核心直觉:最优策略在每个状态都做出使当前奖励加上未来最优价值最大化的动作。在交易中,这意味着每一笔交易决策都要权衡即时收益和保持头寸带来的未来潜力。

import numpy as np
 
# 简单的 Bellman 迭代示例(网格世界比喻)
# 将交易状态离散化为网格,迭代求解最优价值函数
 
gamma = 0.99  # 折扣因子,接近1表示重视远期收益
num_states = 5  # 简化:5个市场状态(大跌、跌、平、涨、大涨)
 
# 初始化价值函数
V = np.zeros(num_states)
 
# 假设的状态转移矩阵(行=当前状态,列=下一状态)
P = np.array([
    [0.6, 0.3, 0.1, 0.0, 0.0],  # 大跌倾向于继续跌
    [0.2, 0.4, 0.3, 0.1, 0.0],
    [0.1, 0.2, 0.4, 0.2, 0.1],  # 平稳状态
    [0.0, 0.1, 0.3, 0.4, 0.2],
    [0.0, 0.0, 0.1, 0.3, 0.6],  # 大涨倾向于继续涨
])
 
# 假设的奖励函数(买入并持有)
R = np.array([-0.03, -0.01, 0.005, 0.01, 0.03])
 
# Bellman 迭代
for iteration in range(100):
    V_new = np.zeros_like(V)
    for s in range(num_states):
        V_new[s] = R[s] + gamma * np.sum(P[s] * V)
    if np.max(np.abs(V_new - V)) < 1e-8:
        break
    V = V_new
 
print(f"收敛于第 {iteration + 1} 轮迭代")
print(f"各状态最优价值: {V.round(4)}")

1.4 DQN — 深度 Q 网络

DQN(Deep Q-Network)用神经网络近似 Q 函数,解决了高维状态空间的值函数逼近问题。

核心创新

  1. 经验回放(Experience Replay):打破数据相关性,提高样本效率
  2. 目标网络(Target Network):稳定训练目标,避免振荡
"""
DQN 核心实现 — PyTorch
适用于离散动作空间的交易策略(买入/卖出/持有)
"""
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import deque
import random
 
 
class DQNNetwork(nn.Module):
    """深度 Q 网络结构"""
 
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim),
        )
 
    def forward(self, x):
        return self.net(x)  # 输出每个动作的 Q 值
 
 
class ReplayBuffer:
    """经验回放缓冲区"""
 
    def __init__(self, capacity=10000):
        self.buffer = deque(maxlen=capacity)
 
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
 
    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (
            np.array(states, dtype=np.float32),
            np.array(actions, dtype=np.int64),
            np.array(rewards, dtype=np.float32),
            np.array(next_states, dtype=np.float32),
            np.array(dones, dtype=np.float32),
        )
 
    def __len__(self):
        return len(self.buffer)
 
 
class DQNAgent:
    """DQN 智能体"""
 
    def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99, tau=0.005):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
 
        # 在线网络和目标网络
        self.q_network = DQNNetwork(state_dim, action_dim).to(self.device)
        self.target_network = DQNNetwork(state_dim, action_dim).to(self.device)
        self.target_network.load_state_dict(self.q_network.state_dict())
 
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
        self.gamma = gamma
        self.tau = tau  # 软更新系数
        self.buffer = ReplayBuffer()
 
    def select_action(self, state, epsilon=0.1):
        """ε-贪心策略选择动作"""
        if random.random() < epsilon:
            return random.randint(0, self.q_network.net[-1].out_features - 1)
        with torch.no_grad():
            state_t = torch.FloatTensor(state).unsqueeze(0).to(self.device)
            q_values = self.q_network(state_t)
            return q_values.argmax(dim=1).item()
 
    def update(self, batch_size=64):
        if len(self.buffer) < batch_size:
            return None
 
        # 从经验回放中采样
        states, actions, rewards, next_states, dones = self.buffer.sample(batch_size)
 
        states = torch.FloatTensor(states).to(self.device)
        actions = torch.LongTensor(actions).to(self.device)
        rewards = torch.FloatTensor(rewards).to(self.device)
        next_states = torch.FloatTensor(next_states).to(self.device)
        dones = torch.FloatTensor(dones).to(self.device)
 
        # 计算当前 Q 值
        q_values = self.q_network(states).gather(1, actions.unsqueeze(1)).squeeze(1)
 
        # 计算目标 Q 值(使用目标网络)
        with torch.no_grad():
            next_q = self.target_network(next_states).max(dim=1)[0]
            target_q = rewards + self.gamma * next_q * (1 - dones)
 
        # Huber 损失比 MSE 更鲁棒
        loss = nn.SmoothL1Loss()(q_values, target_q)
 
        # 反向传播
        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.q_network.parameters(), 1.0)
        self.optimizer.step()
 
        # 软更新目标网络
        for target_param, param in zip(
            self.target_network.parameters(), self.q_network.parameters()
        ):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
 
        return loss.item()

1.5 REINFORCE — 策略梯度方法

REINFORCE 是最经典的策略梯度算法,直接参数化策略 π(a|s) 而非价值函数。

"""
REINFORCE 策略梯度算法 — PyTorch
直接学习交易策略(连续动作:仓位比例)
"""
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
 
 
class PolicyNetwork(nn.Module):
    """高斯策略网络(输出动作的均值和标准差)"""
 
    def __init__(self, state_dim, action_dim=1, hidden_dim=64):
        super().__init__()
        self.shared = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
        )
        # 输出动作均值
        self.mean_head = nn.Linear(hidden_dim, action_dim)
        # 输出动作标准差(取对数保证正数)
        self.log_std = nn.Parameter(torch.zeros(action_dim))
 
    def forward(self, x):
        features = self.shared(x)
        mean = torch.tanh(self.mean_head(features))  # 限制在 [-1, 1]
        std = torch.exp(self.log_std.clamp(-20, 2))  # 防止数值溢出
        return mean, std
 
    def get_action(self, state):
        """采样动作并计算对数概率"""
        mean, std = self.forward(state)
        dist = torch.distributions.Normal(mean, std)
        action = dist.sample()
        log_prob = dist.log_prob(action)
        return action, log_prob
 
 
class REINFORCEAgent:
    """REINFORCE 智能体"""
 
    def __init__(self, state_dim, action_dim=1, lr=1e-3, gamma=0.99):
        self.policy = PolicyNetwork(state_dim, action_dim)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
        self.gamma = gamma
        self.saved_log_probs = []
        self.saved_rewards = []
 
    def select_action(self, state):
        state_t = torch.FloatTensor(state).unsqueeze(0)
        action, log_prob = self.policy.get_action(state_t)
        self.saved_log_probs.append(log_prob)
        return action.item()
 
    def store_reward(self, reward):
        self.saved_rewards.append(reward)
 
    def update(self):
        """一轮 episode 结束后更新策略"""
        returns = []
        discounted_r = 0
        # 从后往前计算折扣回报
        for r in reversed(self.saved_rewards):
            discounted_r = r + self.gamma * discounted_r
            returns.insert(0, discounted_r)
 
        # 标准化回报
        returns = torch.tensor(returns, dtype=torch.float32)
        if len(returns) > 1:
            returns = (returns - returns.mean()) / (returns.std() + 1e-8)
 
        # 策略梯度:∇J = Σ ∇log π(a|s) * G_t
        policy_loss = []
        for log_prob, G in zip(self.saved_log_probs, returns):
            policy_loss.append(-log_prob * G)
 
        loss = torch.stack(policy_loss).sum()
 
        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.policy.parameters(), 0.5)
        self.optimizer.step()
 
        # 清空缓冲区
        self.saved_log_probs = []
        self.saved_rewards = []
 
        return loss.item()

1.6 PPO — 近端策略优化

PPO(Proximal Policy Optimization)是目前最实用的策略梯度算法,在样本效率和稳定性之间取得了良好平衡。

核心思想:限制策略更新幅度,避免”步子迈太大”导致性能崩塌。

L^{CLIP}(\theta) = E[min(
    r_t(\theta) * A_t,
    clip(r_t(\theta), 1-ε, 1+ε) * A_t
)]

其中:
    r_t(\theta) = π_θ(a_t|s_t) / π_θ_old(a_t|s_t)  # 新旧策略概率比
    A_t = Q(s_t, a_t) - V(s_t)                       # 优势函数
    ε ≈ 0.2                                           # 裁剪范围
"""
PPO 算法核心组件 — PyTorch
包含 Actor-Critic 架构和裁剪目标函数
"""
import torch
import torch.nn as nn
import torch.optim as optim
 
 
class ActorCritic(nn.Module):
    """Actor-Critic 共享网络"""
 
    def __init__(self, state_dim, action_dim=1, hidden_dim=128):
        super().__init__()
        # 共享特征提取
        self.shared = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh(),
        )
        # Actor 头:输出动作分布参数
        self.actor_mean = nn.Linear(hidden_dim, action_dim)
        self.actor_log_std = nn.Parameter(torch.zeros(action_dim))
        # Critic 头:输出状态价值
        self.critic = nn.Linear(hidden_dim, 1)
 
    def forward(self, x):
        features = self.shared(x)
        # 动作分布
        mean = torch.tanh(self.actor_mean(features))
        std = torch.exp(self.actor_log_std.clamp(-20, 2))
        # 价值估计
        value = self.critic(features)
        return mean, std, value
 
    def get_action_and_value(self, x, action=None):
        mean, std, value = self.forward(x)
        dist = torch.distributions.Normal(mean, std)
        if action is None:
            action = dist.sample()
        log_prob = dist.log_prob(action).sum(dim=-1, keepdim=True)
        entropy = dist.entropy().sum(dim=-1, keepdim=True)
        return action, log_prob, entropy, value
 
 
def compute_gae(rewards, values, dones, next_values, gamma=0.99, lam=0.95):
    """计算广义优势估计(GAE)"""
    advantages = []
    gae = 0
    for t in reversed(range(len(rewards))):
        if t == len(rewards) - 1:
            next_val = next_values
        else:
            next_val = values[t + 1]
        # TD 误差
        delta = rewards[t] + gamma * next_val * (1 - dones[t]) - values[t]
        # GAE 递推
        gae = delta + gamma * lam * (1 - dones[t]) * gae
        advantages.insert(0, gae)
    return advantages
 
 
def ppo_update(actor_critic, optimizer, rollout_buffer, clip_eps=0.2,
               update_epochs=4, mini_batch_size=64):
    """
    PPO 更新步骤
 
    参数:
        actor_critic: Actor-Critic 网络
        optimizer: 优化器
        rollout_buffer: 收集的经验数据
        clip_eps: PPO 裁剪参数 (通常 0.2)
        update_epochs: 每批数据的更新轮数
        mini_batch_size: 小批量大小
    """
    states = torch.FloatTensor(np.array(rollout_buffer['states']))
    actions = torch.FloatTensor(np.array(rollout_buffer['actions']))
    old_log_probs = torch.FloatTensor(np.array(rollout_buffer['log_probs']))
    advantages = torch.FloatTensor(np.array(rollout_buffer['advantages']))
    returns = torch.FloatTensor(np.array(rollout_buffer['returns']))
 
    # 优势标准化
    advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
 
    total_policy_loss = 0
    total_value_loss = 0
    num_updates = 0
 
    for _ in range(update_epochs):
        # 随机打乱
        indices = torch.randperm(len(states))
 
        for start in range(0, len(states), mini_batch_size):
            end = start + mini_batch_size
            mb_idx = indices[start:end]
 
            mb_states = states[mb_idx]
            mb_actions = actions[mb_idx]
            mb_old_log_probs = old_log_probs[mb_idx]
            mb_advantages = advantages[mb_idx]
            mb_returns = returns[mb_idx]
 
            # 获取新的动作概率和价值
            _, new_log_prob, entropy, new_value = actor_critic.get_action_and_value(
                mb_states, mb_actions
            )
 
            # 策略概率比
            ratio = torch.exp(new_log_prob - mb_old_log_probs)
 
            # PPO 裁剪目标
            surr1 = ratio * mb_advantages
            surr2 = torch.clamp(ratio, 1 - clip_eps, 1 + clip_eps) * mb_advantages
            policy_loss = -torch.min(surr1, surr2).mean()
 
            # 价值函数损失
            value_loss = nn.MSELoss()(new_value.squeeze(), mb_returns)
 
            # 熵正则化(鼓励探索)
            entropy_loss = -entropy.mean()
 
            # 总损失
            loss = policy_loss + 0.5 * value_loss + 0.01 * entropy_loss
 
            optimizer.zero_grad()
            loss.backward()
            nn.utils.clip_grad_norm_(actor_critic.parameters(), 0.5)
            optimizer.step()
 
            total_policy_loss += policy_loss.item()
            total_value_loss += value_loss.item()
            num_updates += 1
 
    return total_policy_loss / num_updates, total_value_loss / num_updates

1.7 SAC — 软演员-评论家

SAC(Soft Actor-Critic)结合了最大熵框架和 off-policy 学习,具有以下优势:

  • 自动探索:通过熵正则化自动平衡探索与利用
  • 样本高效:off-policy 算法,可重复利用历史数据
  • 鲁棒性:最大熵目标使策略对扰动更鲁棒
目标函数:π* = argmax_π E[Σ γ^t (r(s_t, a_t) + α * H(π(·|s_t)))]

其中:
    H(π(·|s_t)) = -E_{a~π}[log π(a|s_t)]  # 策略熵
    α: 温度参数,控制探索程度(可自动调节)

SAC 在实际交易中特别适合连续动作空间(如动态调整仓位比例),因为它不需要精心调参就能稳定训练。

1.8 RL 算法对比

算法类型动作空间样本效率训练稳定性量化适用场景
DQNValue-based离散简单买卖信号
REINFORCEPolicy-based连续/离散概念验证
PPOActor-Critic连续/离散仓位管理、策略优化
SACActor-Critic连续连续仓位调优
A2C/A3CActor-Critic连续/离散多资产并行
TD3Actor-Critic连续低延迟执行

实战建议:PPO 是当前量化交易的”默认选择”——样本效率适中、训练稳定、API 成熟(stable-baselines3 直接可用)。


Part 2:量化交易实战

2.1 TradingEnv — 自定义交易环境

使用 gymnasium 构建标准化交易环境是 RL 量化交易的第一步。

"""
量化交易环境 — 基于 gymnasium
状态/动作/奖励设计
"""
import gymnasium as gym
from gymnasium import spaces
import numpy as np
 
 
class TradingEnv(gym.Env):
    """
    标准化量化交易环境
 
    状态空间:
        - 过去 lookback 窗口的收益率序列
        - 技术指标(RSI, 波动率, 成交量比率)
        - 当前持仓比例
 
    动作空间:
        - 连续: [-1, 1],表示目标仓位比例
        - -1 = 全部做空, 0 = 空仓, 1 = 全仓做多
 
    奖励设计:
        - 风险调整收益: r = position * return - λ * variance
    """
 
    metadata = {"render_modes": ["human"]}
 
    def __init__(self, prices, lookback=20, transaction_cost=0.001,
                 risk_aversion=0.5, max_position=1.0):
        super().__init__()
 
        self.prices = prices  # 收盘价序列
        self.lookback = lookback
        self.transaction_cost = transaction_cost  # 手续费率
        self.risk_aversion = risk_aversion  # 风险厌恶系数
        self.max_position = max_position
 
        # 计算衍生特征
        self.returns = np.diff(prices, prepend=prices[0]) / prices[0]
 
        # 动作空间:连续值 [-1, 1]
        self.action_space = spaces.Box(
            low=-max_position, high=max_position, shape=(1,), dtype=np.float32
        )
 
        # 状态空间:[收益率序列, RSI, 波动率, 成交量比率, 当前仓位]
        obs_dim = lookback + 4  # 收益率序列 + 3个指标 + 仓位
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf, shape=(obs_dim,), dtype=np.float32
        )
 
        self.current_step = lookback
        self.position = 0.0
        self.portfolio_value = 1.0
 
    def _compute_rsi(self, window=14):
        """计算 RSI 指标"""
        delta = np.diff(self.prices)
        gain = np.where(delta > 0, delta, 0)
        loss = np.where(delta < 0, -delta, 0)
        avg_gain = np.convolve(gain, np.ones(window)/window, mode='valid')
        avg_loss = np.convolve(loss, np.ones(window)/window, mode='valid')
        rs = avg_gain / (avg_loss + 1e-8)
        rsi = 100 - (100 / (1 + rs))
        return rsi
 
    def _get_observation(self):
        """构建当前观测状态"""
        # 最近 lookback 天的收益率
        start = self.current_step - self.lookback
        ret_window = self.returns[start:self.current_step]
 
        # 当前技术指标
        rsi = self._compute_rsi()
        current_rsi = rsi[self.current_step - 14] if self.current_step >= 14 else 50.0
 
        # 滚动波动率
        vol = np.std(self.returns[start:self.current_step]) * np.sqrt(252)
 
        # 成交量比率(简化版)
        vol_ratio = 1.0  # 实际中应使用真实成交量
 
        # 拼接观测
        obs = np.concatenate([
            ret_window,
            [current_rsi / 100.0, vol, vol_ratio, self.position]
        ]).astype(np.float32)
 
        return obs
 
    def _compute_reward(self, action):
        """计算风险调整奖励"""
        # 执行交易后的仓位
        new_position = float(action[0])
        position_change = abs(new_position - self.position)
 
        # 交易成本
        cost = self.transaction_cost * position_change
 
        # 策略收益
        strategy_return = self.position * self.returns[self.current_step]
 
        # 风险惩罚(滚动窗口内的收益方差)
        if self.current_step >= self.lookback:
            recent_returns = self.returns[self.current_step - 20:self.current_step]
            risk_penalty = self.risk_aversion * np.var(recent_returns) * self.position ** 2
        else:
            risk_penalty = 0.0
 
        # 总奖励 = 策略收益 - 交易成本 - 风险惩罚
        reward = strategy_return - cost - risk_penalty
        return reward
 
    def step(self, action):
        """执行一步交易"""
        reward = self._compute_reward(action)
 
        # 更新状态
        self.position = float(np.clip(action[0], -self.max_position, self.max_position))
        self.current_step += 1
 
        # 更新组合价值
        self.portfolio_value *= (1 + self.position * self.returns[self.current_step - 1])
 
        done = self.current_step >= len(self.prices) - 1
        observation = self._get_observation()
 
        info = {
            "portfolio_value": self.portfolio_value,
            "position": self.position,
            "step": self.current_step,
        }
 
        return observation, reward, done, False, info
 
    def reset(self, seed=None, options=None):
        """重置环境"""
        super().reset(seed=seed)
        self.current_step = self.lookback
        self.position = 0.0
        self.portfolio_value = 1.0
        return self._get_observation(), {}

2.2 PPO 训练 — stable-baselines3

使用工业级的 stable-baselines3 库进行 PPO 训练,避免从零实现。

"""
使用 stable-baselines3 训练 PPO 交易智能体
"""
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback, EvalCallback
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize
import numpy as np
 
 
class PortfolioCallback(BaseCallback):
    """自定义回调:记录组合净值曲线"""
 
    def __init__(self, verbose=0):
        super().__init__(verbose)
        self.portfolio_values = []
 
    def _on_step(self):
        # 获取环境信息
        infos = self.locals.get("infos", [])
        if infos and "portfolio_value" in infos[0]:
            self.portfolio_values.append(infos[0]["portfolio_value"])
        return True
 
 
def train_trading_agent(env, total_timesteps=100_000):
    """
    训练 PPO 交易智能体
 
    参数:
        env: TradingEnv 实例
        total_timesteps: 总训练步数
    """
    # 向量化环境(标准化观测值)
    vec_env = DummyVecEnv([lambda: env])
    vec_env = VecNormalize(vec_env, norm_obs=True, norm_reward=True)
 
    # 创建 PPO 模型
    model = PPO(
        "MlpPolicy",           # 多层感知机策略
        vec_env,
        learning_rate=3e-4,    # 学习率
        n_steps=2048,          # 每次更新的步数
        batch_size=64,         # 批量大小
        n_epochs=10,           # 每批数据的训练轮数
        gamma=0.99,            # 折扣因子
        gae_lambda=0.95,       # GAE lambda
        clip_range=0.2,        # PPO 裁剪范围
        ent_coef=0.01,         # 熵正则化系数
        vf_coef=0.5,           # 价值函数损失权重
        max_grad_norm=0.5,     # 梯度裁剪
        verbose=1,
    )
 
    # 设置评估回调
    eval_env = DummyVecEnv([lambda: env])
    eval_callback = EvalCallback(
        eval_env,
        best_model_save_path="./best_model/",
        eval_freq=5000,
        n_eval_episodes=5,
        deterministic=True,
    )
 
    # 训练
    model.learn(
        total_timesteps=total_timesteps,
        callback=[PortfolioCallback(), eval_callback],
        progress_bar=True,
    )
 
    return model

2.3 RL 训练常见陷阱

陷阱症状解决方案
过拟合历史数据回测收益极好,实盘亏损滚动窗口训练、多市场验证
奖励设计偏差智能体学到歪曲策略(如频繁交易)使用风险调整奖励,惩罚换手率
观测泄露回测中使用了未来信息严格检查 look-ahead bias
动作空间过大训练不收敛,策略震荡从简单动作空间开始(如 3 档仓位)
奖励稀疏智能体学不到有效策略设计辅助奖励(如 Sharpe 中间奖励)
非平稳性市场制度变化导致策略失效在线学习/定期重训、市场状态识别
幸存者偏差只在成功股票上测试包含退市/停牌数据处理
# 常见奖励设计模式对比
 
# 反模式 1:纯收益奖励(导致高频交易)
reward_bad = position * daily_return
 
# 反模式 2:无成本奖励(忽略执行成本)
reward_bad2 = portfolio_value - prev_value
 
# 推荐模式:多维度风险调整奖励
def reward_function(position, returns, costs, risk_metrics):
    """
    综合奖励函数设计
 
    组成部分:
        1. 策略收益(扣除成本)
        2. 波动率惩罚
        3. 回撤惩罚
        4. 换手率惩罚
    """
    # 策略收益
    gross_return = position * returns
 
    # 交易成本
    net_return = gross_return - costs
 
    # 风险调整
    vol_penalty = risk_metrics.get("volatility", 0) * 0.5
    drawdown_penalty = max(0, -risk_metrics.get("drawdown", 0)) * 2.0
 
    # 换手率惩罚(防止过度交易)
    turnover_penalty = costs * 3.0
 
    return net_return - vol_penalty - drawdown_penalty - turnover_penalty

2.4 RL 增强 ML 方法

RL 并不一定要完全替代传统 ML,更实用的方式是用 RL 增强 ML 流水线的特定环节。

应用场景 1:仓位管理

传统 ML 输出信号(涨跌概率),RL 决定仓位大小:

"""
两阶段框架:ML 预测 + RL 仓位管理
 
Stage 1: ML 模型预测未来收益分布
Stage 2: RL 根据预测和账户状态决定仓位
"""
import numpy as np
 
# ML 预测阶段(传统方法)
# model.predict(X) → μ (预测均值), σ (预测不确定性)
 
# RL 仓位管理阶段
class PositionSizingEnv(gym.Env):
    """
    输入: ML 预测的 (μ, σ) + 账户状态
    输出: 仓位比例 w ∈ [-1, 1]
    """
    def __init__(self, predictions, actual_returns, lookback=10):
        self.predictions = predictions  # (μ, σ) 序列
        self.actual_returns = actual_returns
        # ... 环境初始化 ...

应用场景 2:执行优化

用 RL 优化大单拆分和执行时机,替代固定规则(TWAP/VWAP)。

2.5 Offline RL — 从历史数据学习

Online RL 需要与真实市场交互(代价高昂),Offline RL 可以直接从历史交易数据中学习。

算法核心思想适用场景
BCQ (Batch-Constrained Q-learning)限制动作在行为策略范围内保守改进现有策略
CQL (Conservative Q-Learning)惩罚 OOD 动作的 Q 值安全地从历史数据学习
IQL (Implicit Q-Learning)不需要 OOD 动作评估纯离线策略评估
"""
Offline RL 概念:CQL 的核心思想
 
问题:离线 Q 学习会高估未见动作的 Q 值
解决:在标准 Q 学习损失中加入保守惩罚项
 
L_CQL(θ) = L_Q(θ) + α * (E_{s~D}[log Σ_a exp(Q_θ(s,a))] - E_{(s,a)~D}[Q_θ(s,a)])
 
直观理解:
    - 第一项:惩罚所有动作的平均 Q 值(防止高估未见动作)
    - 第二项:保持数据集中真实动作的 Q 值
    - 结果:Q 函数只对数据中出现的动作给出高价值
"""

实战建议:如果已有大量历史交易数据但无法在线交互,Offline RL 是更安全的选择。推荐从 d3rlpy 库开始,它提供了 BCQ、CQL、IQL 等算法的即用实现。

2.6 何时用/不用 RL

场景推荐原因
高频做市传统规则 + 监督学习延迟要求高,RL 训练太慢
多资产组合优化RL(PPO/SAC)连续决策、动态平衡
执行算法优化RL序贯决策问题,RL 天然适合
简单择时策略监督学习任务简单,RL 杀鸡用牛刀
数据极少传统统计方法RL 样本效率低
需要可解释性规则/线性模型RL 是黑箱

核心总结

┌──────────────────────────────────────────────────────────────────┐
│                  RL 量化交易核心要点                               │
├──────────────────────────────────────────────────────────────────┤
│                                                                  │
│  1. MDP 建模是关键                                                │
│     状态设计 > 动作设计 > 奖励设计(按重要性排序)                  │
│                                                                  │
│  2. PPO 是默认选择                                                │
│     稳定、样本效率适中、SB3 直接可用                               │
│                                                                  │
│  3. 奖励函数决定一切                                              │
│     简单 PnL 奖励不够,必须加入风险惩罚和成本惩罚                    │
│                                                                  │
│  4. 过拟合是最大敌人                                              │
│     多市场验证、滚动窗口训练、在线微调                              │
│                                                                  │
│  5. RL + ML 是更实用的路径                                        │
│     ML 预测信号,RL 做仓位管理/执行优化                            │
│                                                                  │
│  6. Offline RL 降低实盘风险                                       │
│     先用 CQL/BCQ 在历史数据上验证,再考虑在线部署                   │
│                                                                  │
└──────────────────────────────────────────────────────────────────┘