高级主题与实战案例

Multi-Agent、多资产组合与 Offline RL:深入探索 RL 交易的高级应用

本文介绍强化学习在量化交易中的高级主题,包括 Multi-Agent RL、多资产组合优化、Offline RL 以及完整的回测案例。


Multi-Agent RL:多智能体强化学习

Multi-Agent RL(MARL)研究多个智能体如何协同或竞争。在量化交易中,这可以用于:

  • 多个资产/策略的协同管理
  • 市场微观结构建模(买方/卖方博弈)
  • 组合风险管理

MARL 的核心概念

1. 合作 vs 竞争

  • 合作 MARL:所有智能体共享奖励,目标一致
    • 应用:多资产协同交易、分布式执行
  • 竞争 MARL:智能体之间零和博弈
    • 应用:对手盘建模、市场模拟
  • 混合 MARL:既有合作又有竞争
    • 应用:部分信息博弈

2. 状态与动作空间

在 MARL 中,每个智能体都有自己的观察和动作:

全局状态 s^t 包含所有信息,但每个智能体只能看到部分观察。

3. 奖励分配

如何将全局奖励分配给各个智能体是 MARL 的关键问题:

  • 全局奖励:所有智能体共享同一个奖励
  • 局部奖励:每个智能体有自己的奖励
  • 差异化奖励:考虑个体贡献的奖励分配

简单的 Multi-Agent 交易环境

import numpy as np
import gymnasium as gym
from gymnasium import spaces
 
 
class MultiAssetTradingEnv(gym.Env):
    """
    多资产交易环境(合作 MARL)
 
    每个智能体负责管理一个资产
    目标:最大化整个组合的 Sharpe 比率
    """
 
    def __init__(self, price_data, n_agents=5):
        """
        参数:
            price_data: dict, {asset_name: price_series}
            n_agents: 智能体数量(每个管理一个资产)
        """
        super().__init__()
 
        self.price_data = price_data
        self.asset_names = list(price_data.keys())[:n_agents]
        self.n_agents = n_agents
 
        # 每个智能体的动作空间:离散 [hold, buy, sell]
        self.action_space = spaces.MultiDiscrete([3] * n_agents)
 
        # 观察空间:每个智能体看到自己资产的价格特征
        self.observation_space = spaces.Tuple([
            spaces.Box(low=-np.inf, high=np.inf, shape=(20,), dtype=np.float32)
            for _ in range(n_agents)
        ])
 
        # 环境状态
        self.current_step = 0
        self.positions = np.zeros(n_agents)
        self.cash = 10000.0
        self.initial_cash = 10000.0
 
    def reset(self, seed=None):
        super().reset(seed=seed)
        self.current_step = 20  # 需要足够的历史数据
        self.positions = np.zeros(self.n_agents)
        self.cash = self.initial_cash
        return self._get_obs(), {}
 
    def _get_obs(self):
        """获取每个智能体的观察"""
        observations = []
        for i, asset in enumerate(self.asset_names):
            prices = self.price_data[asset].values
 
            # 每个智能体只看到自己资产的价格特征
            window_prices = prices[self.current_step - 20:self.current_step]
            returns = np.diff(window_prices) / window_prices[:-1]
            obs = np.concatenate([
                returns,  # 19 个收益率
                [self.positions[i] / 100],  # 自己的持仓
            ])
            observations.append(obs.astype(np.float32))
 
        return tuple(observations)
 
    def step(self, actions):
        """
        执行动作
 
        参数:
            actions: 长度为 n_agents 的数组,每个是 0/1/2
        """
        old_value = self._get_portfolio_value()
 
        # 每个智能体执行自己的动作
        for i, action in enumerate(actions):
            asset = self.asset_names[i]
            price = self.price_data[asset].values[self.current_step]
 
            if action == 1:  # buy
                amount = self.cash * 0.1 / self.n_agents  # 分散资金
                if amount > price:
                    shares = int(amount / price)
                    self.positions[i] += shares
                    self.cash -= shares * price * 1.001  # 手续费
 
            elif action == 2:  # sell
                if self.positions[i] > 0:
                    shares = int(self.positions[i] * 0.5)
                    self.cash += shares * price * 0.999
                    self.positions[i] -= shares
 
        # 推进时间
        self.current_step += 1
 
        # 计算奖励(全局 Sharpe)
        new_value = self._get_portfolio_value()
        portfolio_return = (new_value - old_value) / old_value
 
        # 奖励 = 组合收益 - 分散度惩罚(鼓励分散投资)
        reward = portfolio_return * 100
 
        # 惩罚过度集中
        position_values = np.array([
            self.positions[i] * self.price_data[self.asset_names[i]].values[self.current_step]
            for i in range(self.n_agents)
        ])
        if np.sum(position_values) > 0:
            concentration = np.max(np.abs(position_values)) / np.sum(position_values)
            reward -= 0.1 * (concentration - 0.5)  # 鼓励分散
 
        done = self.current_step >= len(next(iter(self.price_data.values()))) - 1
 
        return self._get_obs(), reward, done, False, {
            "portfolio_value": new_value,
            "return": (new_value - self.initial_cash) / self.initial_cash,
        }
 
    def _get_portfolio_value(self):
        """计算组合价值"""
        value = self.cash
        for i, asset in enumerate(self.asset_names):
            price = self.price_data[asset].values[self.current_step]
            value += self.positions[i] * price
        return value
 
 
# 使用示例
if __name__ == "__main__":
    import pandas as pd
 
    # 生成多个资产的价格数据
    np.random.seed(42)
    n_days = 1000
 
    price_data = {}
    for i in range(5):
        returns = np.random.normal(0.0001, 0.02, n_days)
        prices = 100 * np.exp(np.cumsum(returns))
        price_data[f"asset_{i}"] = pd.Series(prices)
 
    # 创建环境
    env = MultiAssetTradingEnv(price_data, n_agents=5)
 
    # 测试
    obs, info = env.reset()
    print(f"观察空间: {len(obs)} 个智能体,每个 {obs[0].shape}")
 
    for _ in range(10):
        actions = [env.action_space[i].sample() for i in range(env.n_agents)]
        obs, reward, done, truncated, info = env.step(actions)
        print(f"Reward: {reward:.4f}, Portfolio Value: {info['portfolio_value']:.2f}")
 
        if done:
            break

MAPPO:Multi-Agent PPO

MAPPO 是 PPO 在多智能体场景的扩展,简单有效。

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Categorical
 
 
class MAPPOAgent:
    """
    简化的 MAPPO 实现
 
    核心思想:
    1. 每个 agent 有自己的 actor
    2. 共享一个 critic(估计全局价值)
    3. 使用中心化的价值函数训练
    """
 
    def __init__(self, n_agents, obs_dim, action_dim, hidden_dim=64):
        self.n_agents = n_agents
        self.obs_dim = obs_dim
        self.action_dim = action_dim
 
        # 每个 agent 的 actor
        self.actors = nn.ModuleList([
            nn.Sequential(
                nn.Linear(obs_dim, hidden_dim),
                nn.ReLU(),
                nn.Linear(hidden_dim, hidden_dim),
                nn.ReLU(),
                nn.Linear(hidden_dim, action_dim),
                nn.Softmax(dim=-1)
            )
            for _ in range(n_agents)
        ])
 
        # 共享的 critic(输入所有 agent 的观察)
        self.critic = nn.Sequential(
            nn.Linear(obs_dim * n_agents, hidden_dim * 2),
            nn.ReLU(),
            nn.Linear(hidden_dim * 2, hidden_dim * 2),
            nn.ReLU(),
            nn.Linear(hidden_dim * 2, 1)
        )
 
        self.optimizer = torch.optim.Adam(
            list(self.actors.parameters()) + list(self.critic.parameters()),
            lr=3e-4
        )
 
    def select_actions(self, observations):
        """为所有 agent 选择动作"""
        actions = []
        log_probs = []
 
        for i, obs in enumerate(observations):
            obs_tensor = torch.FloatTensor(obs)
            action_probs = self.actors[i](obs_tensor)
            dist = Categorical(action_probs)
            action = dist.sample()
            actions.append(action.item())
            log_probs.append(dist.log_prob(action))
 
        return actions, torch.stack(log_probs)
 
    def get_value(self, observations):
        """计算全局价值"""
        # 拼接所有观察
        concat_obs = torch.cat([torch.FloatTensor(obs) for obs in observations])
        return self.critic(concat_obs)
 
    def update(self, trajectories, gamma=0.99, epsilon=0.2):
        """
        更新网络
 
        参数:
            trajectories: list of (obs, actions, log_probs, rewards, next_obs, dones)
        """
        # 计算 returns 和 advantages
        returns = []
        advantages = []
 
        for traj in trajectories:
            obs_list, actions_list, old_log_probs, rewards, next_obs_list, dones = zip(*traj)
 
            # 计算 returns
            R = 0
            episode_returns = []
            for r, d in zip(reversed(rewards), reversed(dones)):
                R = r + gamma * R * (1 - d)
                episode_returns.insert(0, R)
            returns.extend(episode_returns)
 
            # 计算 advantages(简单版本:使用 returns - value)
            episode_advantages = []
            for i, (obs, ret) in enumerate(zip(obs_list, episode_returns)):
                value = self.get_value(obs)
                advantage = ret - value.item()
                episode_advantages.append(advantage)
            advantages.extend(episode_advantages)
 
        returns = torch.FloatTensor(returns)
        advantages = torch.FloatTensor(advantages)
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
 
        # PPO 更新
        for _ in range(10):  # PPO epochs
            # 重新采样(简化:使用全部数据)
            batch_indices = np.random.permutation(len(trajectories))
 
            for idx in batch_indices:
                obs_list, actions_list, old_log_probs, _, _, _ = zip(*trajectories[idx])
 
                for agent_idx in range(self.n_agents):
                    obs = torch.FloatTensor(obs_list[agent_idx])
                    action = torch.LongTensor([actions_list[agent_idx]])
                    old_log_prob = old_log_probs[agent_idx]
 
                    # 新的策略
                    action_probs = self.actors[agent_idx](obs)
                    dist = Categorical(action_probs)
                    new_log_prob = dist.log_prob(action)
 
                    # Ratio
                    ratio = torch.exp(new_log_prob - old_log_prob)
 
                    # PPO clip
                    advantage = advantages[idx]
                    surr1 = ratio * advantage
                    surr2 = torch.clamp(ratio, 1 - epsilon, 1 + epsilon) * advantage
                    actor_loss = -torch.min(surr1, surr2)
 
                    # Critic loss
                    concat_obs = torch.cat([torch.FloatTensor(o) for o in obs_list])
                    value = self.critic(concat_obs)
                    critic_loss = F.mse_loss(value, returns[idx:idx+1])
 
                    # 总损失
                    loss = actor_loss + 0.5 * critic_loss
 
                    self.optimizer.zero_grad()
                    loss.backward()
                    self.optimizer.step()
 
 
# 简化的训练循环
if __name__ == "__main__":
    import pandas as pd
 
    # 生成数据
    np.random.seed(42)
    n_days = 1000
    price_data = {}
    for i in range(5):
        returns = np.random.normal(0.0001, 0.02, n_days)
        prices = 100 * np.exp(np.cumsum(returns))
        price_data[f"asset_{i}"] = pd.Series(prices)
 
    env = MultiAssetTradingEnv(price_data, n_agents=5)
 
    # 创建 MAPPO agent
    agent = MAPPOAgent(
        n_agents=5,
        obs_dim=20,
        action_dim=3
    )
 
    # 训练
    n_episodes = 100
 
    for episode in range(n_episodes):
        obs, _ = env.reset()
        episode_rewards = []
        trajectory = []
 
        for step in range(100):
            # 选择动作
            actions, log_probs = agent.select_actions(obs)
 
            # 执行
            next_obs, reward, done, truncated, info = env.step(actions)
            episode_rewards.append(reward)
 
            # 存储轨迹
            trajectory.append((obs, actions, log_probs, reward, next_obs, done))
 
            obs = next_obs
 
            if done:
                break
 
        # 定期更新
        if len(trajectory) > 0:
            agent.update([trajectory])
 
        if episode % 10 == 0:
            print(f"Episode {episode}, Total Reward: {sum(episode_rewards):.2f}, "
                  f"Return: {info['return']:.2%}")

多资产组合优化

RL 可以用于动态资产配置,根据市场状态调整组合权重。

Black-Litterman 与 RL 结合

class DynamicAssetAllocationEnv(gym.Env):
    """
    动态资产配置环境
 
    状态:市场特征(风险、相关性、动量)
    动作:各资产的权重 [w1, w2, ..., wn]
    奖励:组合收益 - 风险惩罚
    """
 
    def __init__(self, returns_data, window=252):
        super().__init__()
 
        self.returns = returns_data
        self.n_assets = returns_data.shape[1]
        self.window = window
 
        # 状态:市场特征
        n_features = self.n_assets * 3  # 收益、波动、相关系数
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf, shape=(n_features,), dtype=np.float32
        )
 
        # 动作:资产权重(使用 softmax 确保和为 1)
        self.action_space = spaces.Box(
            low=-np.inf, high=np.inf, shape=(self.n_assets,), dtype=np.float32
        )
 
        self.current_step = window
        self.weights = np.ones(self.n_assets) / self.n_assets
 
    def reset(self, seed=None):
        super().reset(seed=seed)
        self.current_step = self.window
        self.weights = np.ones(self.n_assets) / self.n_assets
        return self._get_obs(), {}
 
    def _get_obs(self):
        """获取市场特征"""
        window_returns = self.returns[self.current_step - self.window:self.current_step]
 
        # 各资产的平均收益和波动
        mean_returns = window_returns.mean(axis=0).values
        std_returns = window_returns.std(axis=0).values
 
        # 相关性矩阵(取上三角)
        corr_matrix = window_returns.corr().values
        corr_features = corr_matrix[np.triu_indices(self.n_assets, 1)]
 
        features = np.concatenate([mean_returns, std_returns, corr_features])
        return features.astype(np.float32)
 
    def step(self, action):
        """
        执行动作
 
        参数:
            action: 原始权重(需要转换)
        """
        # 将 action 转换为有效权重(softmax)
        weights = np.exp(action)
        weights = weights / np.sum(weights)
        self.weights = weights
 
        # 获取下一天的收益
        actual_returns = self.returns.iloc[self.current_step].values
        portfolio_return = np.dot(weights, actual_returns)
 
        # 奖励 = 收益 - 风险惩罚
        # 使用滚动窗口的波动率作为风险度量
        window_returns = self.returns[self.current_step - self.window:self.current_step]
        portfolio_volatility = np.sqrt(
            np.dot(weights.T, np.dot(window_returns.cov().values * 252, weights))
        )
 
        reward = portfolio_return * 100 - 0.5 * portfolio_volatility
 
        self.current_step += 1
        done = self.current_step >= len(self.returns) - 1
 
        return self._get_obs(), reward, done, False, {
            "weights": weights,
            "return": portfolio_return,
            "volatility": portfolio_volatility,
        }
 
 
# 使用 SAC 训练资产配置
if __name__ == "__main__":
    from stable_baselines3 import SAC
 
    # 生成多资产收益数据
    np.random.seed(42)
    n_days = 2000
    n_assets = 5
 
    # 生成相关的收益
    mean_returns = np.random.normal(0.0001, 0.0001, n_assets)
    cov_matrix = np.random.uniform(0.0001, 0.001, (n_assets, n_assets))
    cov_matrix = (cov_matrix + cov_matrix.T) / 2  # 对称
    np.fill_diagonal(cov_matrix, 0.001)  # 对角线
 
    returns = np.random.multivariate_normal(mean_returns, cov_matrix, n_days)
    returns_df = pd.DataFrame(returns, columns=[f"asset_{i}" for i in range(n_assets)])
 
    # 创建环境
    env = DynamicAssetAllocationEnv(returns_df)
 
    # 训练 SAC(连续动作)
    model = SAC(
        "MlpPolicy",
        env,
        learning_rate=3e-4,
        buffer_size=100000,
        learning_starts=1000,
        batch_size=256,
        gamma=0.99,
        verbose=1,
    )
 
    print("开始训练资产配置模型...")
    model.learn(total_timesteps=50000)
 
    # 评估
    obs, _ = env.reset()
    weights_history = []
 
    for _ in range(100):
        action, _ = model.predict(obs, deterministic=True)
        obs, reward, done, truncated, info = env.step(action)
        weights_history.append(info["weights"])
 
        if done:
            break
 
    print(f"\n平均权重: {np.mean(weights_history, axis=0)}")
    print(f"权重标准差: {np.std(weights_history, axis=0)}")

Hierarchical RL:分层强化学习

分层 RL 将决策问题分解为不同层次,适合复杂的交易场景。

class HierarchicalTradingAgent:
    """
    分层交易智能体
 
    高层(Manager):选择策略模式(趋势/均值回归/波动)
    低层(Worker):执行具体交易动作
    """
 
    def __init__(self, state_dim, n_modes=3):
        # Manager:选择策略模式
        self.manager = nn.Sequential(
            nn.Linear(state_dim, 64),
            nn.ReLU(),
            nn.Linear(64, n_modes),
            nn.Softmax(dim=-1)
        )
 
        # Workers:每个模式一个 worker
        self.workers = nn.ModuleList([
            nn.Sequential(
                nn.Linear(state_dim + 1, 64),  # +1 是 mode one-hot
                nn.ReLU(),
                nn.Linear(64, 3),  # buy/sell/hold
                nn.Softmax(dim=-1)
            )
            for _ in range(n_modes)
        ])
 
    def select_mode(self, state):
        """高层选择策略模式"""
        state_tensor = torch.FloatTensor(state)
        mode_probs = self.manager(state_tensor)
        mode = torch.distributions.Categorical(mode_probs).sample()
        return mode.item()
 
    def select_action(self, state, mode):
        """低层执行具体动作"""
        mode_onehot = F.one_hot(torch.tensor(mode), num_classes=3).float()
        state_tensor = torch.FloatTensor(state)
        input_vec = torch.cat([state_tensor, mode_onehot])
        action_probs = self.workers[mode](input_vec)
        action = torch.distributions.Categorical(action_probs).sample()
        return action.item()
 
    def forward(self, state):
        """完整的决策过程"""
        mode = self.select_mode(state)
        action = self.select_action(state, mode)
        return mode, action
 
 
# 策略模式定义
TRADING_MODES = {
    0: "trend_following",    # 趋势跟踪
    1: "mean_reversion",     # 均值回归
    2: "volatility_breakout", # 波动突破
}

Offline RL 实战

Offline RL 使用固定数据集训练,不需要与环境交互,非常适合量化交易。

CQL:Conservative Q-Learning

CQL 通过在 Q 值更新中加入保守惩罚,避免对未见过的状态-动作对过度乐观。

"""
Offline RL 实战:使用 d3rlpy
 
d3rlpy 是一个专门的 Offline RL 库,实现了 CQL、BCQ 等算法。
"""
 
# 安装:pip install d3rlpy
 
import d3rlpy
import numpy as np
import pandas as pd
 
def prepare_offline_dataset(price_data, signals, actions):
    """
    准备 Offline RL 数据集
 
    参数:
        price_data: 价格数据
        signals: 历史信号(可以是监督学习的输出)
        actions: 历史动作
 
    返回:
        d3rlpy MDPDataset
    """
    n_samples = len(price_data)
 
    # 构建 observations
    observations = []
    for i in range(20, n_samples):
        # 价格特征
        window_prices = price_data[i-20:i]
        returns = np.diff(window_prices) / window_prices[:-1]
 
        # 技术指标
        sma_short = window_prices[-5:].mean()
        sma_long = window_prices[-20:].mean()
        momentum = (window_prices[-1] - window_prices[0]) / window_prices[0]
 
        obs = np.concatenate([
            returns,
            [sma_short / sma_long, momentum]
        ])
        observations.append(obs)
 
    observations = np.array(observations)
 
    # 构建 actions(历史交易动作)
    # 这里简化,实际应该从历史交易记录提取
    actions = np.array(actions[20:n_samples])
 
    # 构建 rewards
    rewards = price_data.pct_change()[20:n_samples].values * 100
 
    # terminals(episode 结束标记)
    terminals = np.zeros(n_samples - 20, dtype=bool)
    terminals[-1] = True  # 最后一天
 
    # 创建 d3rlpy 数据集
    dataset = d3rlpy.dataset.MDPDataset(
        observations=observations,
        actions=actions,
        rewards=rewards,
        terminals=terminals,
    )
 
    return dataset
 
 
def train_cql_model(dataset, experiment_name="cql_trading"):
    """
    训练 CQL 模型
 
    参数:
        dataset: d3rlpy MDPDataset
        experiment_name: 实验名称
    """
    # 创建 CQL 算法
    cql = d3rlpy.algos.CQL(
        learning_rate=1e-4,
        batch_size=256,
        alpha=1.0,          # 保守程度
        use_gpu=True,
    )
 
    # 设置日志
    cql.create_experiment_dir(
        name=experiment_name,
        logdir="./d3rlpy_logs/"
    )
 
    # 离线训练
    cql.fit(
        dataset,
        n_steps=100000,
        n_steps_per_epoch=1000,
        save_interval=10,
    )
 
    return cql
 
 
def evaluate_cql_model(model, test_data):
    """评估 CQL 模型"""
    # 准备测试数据
    test_dataset = prepare_offline_dataset(
        test_data["close"],
        test_data.get("signals", np.zeros(len(test_data))),
        np.zeros(len(test_data))  # 占位
    )
 
    # 评估
    evaluation = cql.evaluate(test_dataset)
 
    print(f"CQL 模型评估结果:")
    print(f"平均奖励: {evaluation['mean_reward']:.4f}")
    print(f"标准差: {evaluation['std_reward']:.4f}")
 
 
# 示例使用
if __name__ == "__main__":
    # 生成数据
    np.random.seed(42)
    n_days = 2000
    prices = 100 * np.exp(np.cumsum(np.random.normal(0.0001, 0.02, n_days)))
    price_df = pd.DataFrame({"close": prices})
 
    # 模拟历史动作(随机 + 一些简单的策略)
    # 实际中应该从真实交易记录提取
    historical_actions = np.random.randint(0, 3, n_days)
 
    # 准备数据集
    train_df = price_df[:1500]
    test_df = price_df[1500:]
 
    train_dataset = prepare_offline_dataset(
        train_df["close"],
        np.zeros(len(train_df)),
        historical_actions[:1500]
    )
 
    # 训练 CQL
    print("训练 CQL 模型...")
    cql = train_cql_model(train_dataset)
 
    # 评估
    print("\n评估 CQL 模型...")
    evaluate_cql_model(cql, test_df)

BCQ:Batch Constrained Q-learning

BCQ 限制动作选择范围,只选择接近数据集的动作,减少分布偏移。

"""
BCQ 实现示例(简化版)
 
完整的 BCQ 实现较复杂,建议使用 d3rlpy 或 rl-transer
"""
 
class BCQGenerator(nn.Module):
    """
    BCQ 生成器网络
 
    生成接近行为策略的动作
    """
 
    def __init__(self, state_dim, action_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, action_dim),
        )
 
    def forward(self, state):
        return torch.tanh(self.net(state))
 
 
class BCQAgent:
    """
    BCQ 智能体
 
    核心思想:
    1. 训练一个 VAE 生成接近行为策略的动作
    2. 在生成的动作中选择 Q 值最高的
    """
 
    def __init__(self, state_dim, action_dim):
        self.state_dim = state_dim
        self.action_dim = action_dim
 
        # 生成器(VAE 简化版)
        self.generator = BCQGenerator(state_dim, action_dim)
 
        # Q 网络
        self.q_network = nn.Sequential(
            nn.Linear(state_dim + action_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, 1)
        )
 
        # Perturbation 网络(微调生成的动作)
        self.perturbation = nn.Sequential(
            nn.Linear(state_dim + action_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, action_dim),
            nn.Tanh()
        )
 
    def select_action(self, state, n_candidates=10):
        """
        选择动作
 
        参数:
            state: 当前状态
            n_candidates: 生成的候选动作数量
 
        返回:
            最佳动作
        """
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
 
        # 生成候选动作
        with torch.no_grad():
            actions = self.generator(state_tensor.repeat(n_candidates, 1))
 
            # 微调
            perturbations = self.perturbation(
                torch.cat([state_tensor.repeat(n_candidates, 1), actions], dim=1)
            )
            actions = torch.clamp(actions + 0.05 * perturbations, -1, 1)
 
            # 选择 Q 值最高的
            q_values = self.q_network(
                torch.cat([state_tensor.repeat(n_candidates, 1), actions], dim=1)
            )
            best_idx = q_values.argmax()
 
        return actions[best_idx].squeeze().numpy()

完整回测案例

案例 1:单资产 PPO 交易

import gymnasium as gym
from gymnasium import spaces
import numpy as np
import pandas as pd
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback
import matplotlib.pyplot as plt
 
 
class FullBacktestEnv(gym.Env):
    """
    完整回测环境
 
    特点:
    1. 考虑交易成本和滑点
    2. 支持多种技术指标
    3. 详细的状态和日志
    """
 
    def __init__(
        self,
        df: pd.DataFrame,
        initial_cash: float = 100000,
        commission: float = 0.001,
        slippage: float = 0.0001,
        window_size: int = 60,
    ):
        super().__init__()
 
        self.df = df.reset_index(drop=True)
        self.initial_cash = initial_cash
        self.commission = commission
        self.slippage = slippage
        self.window_size = window_size
 
        # 动作:0=hold, 1=buy, 2=sell, 3=close
        self.action_space = spaces.Discrete(4)
 
        # 状态:技术指标 + 账户状态
        n_features = self._compute_n_features()
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf, shape=(n_features,), dtype=np.float32
        )
 
        # 记录
        self.trade_log = []
        self.equity_curve = []
 
    def _compute_n_features(self):
        """计算特征数量"""
        # 价格特征
        price_features = self.window_size  # 收益率
 
        # 技术指标
        indicator_features = 5  # RSI, MACD, 等
 
        # 账户特征
        account_features = 4  # 持仓、现金、净值、收益率
 
        return price_features + indicator_features + account_features
 
    def reset(self, seed=None):
        super().reset(seed=seed)
        self.current_step = self.window_size
        self.position = 0
        self.cash = self.initial_cash
        self.trade_log = []
        self.equity_curve = [self.initial_cash]
        return self._get_obs(), {}
 
    def _get_obs(self):
        """构建观察向量"""
        if self.current_step < self.window_size:
            return np.zeros(self._compute_n_features())
 
        # 1. 价格特征:过去 window_size 天的收益率
        prices = self.df["close"].values
        window_prices = prices[self.current_step - self.window_size:self.current_step]
        returns = np.diff(window_prices) / window_prices[:-1]
        price_features = np.concatenate([[0], returns])
 
        # 2. 技术指标
        rsi = self._compute_rsi(prices, self.current_step)
        macd, signal = self._compute_macd(prices, self.current_step)
        bb_upper, bb_lower = self._compute_bollinger(prices, self.current_step)
        atr = self._compute_atr(prices, self.current_step)
 
        current_price = prices[self.current_step]
        indicator_features = np.array([
            rsi / 100,  # 归一化
            (current_price - bb_lower) / (bb_upper - bb_lower + 1e-8),
            macd / current_price,
            signal / current_price,
            atr / current_price,
        ])
 
        # 3. 账户特征
        total_value = self.cash + self.position * current_price
        account_features = np.array([
            self.position / self.initial_cash,  # 归一化持仓
            self.cash / self.initial_cash,       # 归一化现金
            total_value / self.initial_cash,     # 归一化净值
            (total_value - self.initial_cash) / self.initial_cash,  # 收益率
        ])
 
        obs = np.concatenate([price_features, indicator_features, account_features])
        return obs.astype(np.float32)
 
    def _compute_rsi(self, prices, idx, period=14):
        """计算 RSI"""
        if idx < period:
            return 50
        window = prices[idx-period:idx+1]
        deltas = np.diff(window)
        gains = np.where(deltas > 0, deltas, 0)
        losses = np.where(deltas < 0, -deltas, 0)
        avg_gain = np.mean(gains)
        avg_loss = np.mean(losses)
        if avg_loss == 0:
            return 100
        rs = avg_gain / avg_loss
        return 100 - (100 / (1 + rs))
 
    def _compute_macd(self, prices, idx, fast=12, slow=26, signal=9):
        """计算 MACD"""
        if idx < slow:
            return 0, 0
        window = prices[idx-slow:idx+1]
        ema_fast = pd.Series(window).ewm(span=fast).mean().iloc[-1]
        ema_slow = pd.Series(window).ewm(span=slow).mean().iloc[-1]
        macd = ema_fast - ema_slow
        return macd, macd * 0.9  # 简化的信号线
 
    def _compute_bollinger(self, prices, idx, period=20, std=2):
        """计算布林带"""
        if idx < period:
            return prices[idx] * 1.02, prices[idx] * 0.98
        window = prices[idx-period:idx+1]
        sma = np.mean(window)
        std_dev = np.std(window)
        return sma + std * std_dev, sma - std * std_dev
 
    def _compute_atr(self, prices, idx, period=14):
        """计算 ATR"""
        if idx < period:
            return prices[idx] * 0.02
        high = self.df["high"].values[idx-period:idx+1]
        low = self.df["low"].values[idx-period:idx+1]
        close = self.df["close"].values[idx-period:idx+1]
        tr = np.maximum(
            high[1:] - low[1:],
            np.maximum(abs(high[1:] - close[:-1]), abs(low[1:] - close[:-1]))
        )
        return np.mean(tr)
 
    def step(self, action):
        """执行一步"""
        current_price = self.df["close"].values[self.current_step]
        old_value = self.cash + self.position * current_price
 
        # 执行动作
        self._execute_action(action, current_price)
 
        # 推进时间
        self.current_step += 1
 
        # 检查是否结束
        terminated = self.current_step >= len(self.df) - 1
        truncated = False
 
        # 计算奖励
        new_price = self.df["close"].values[self.current_step]
        new_value = self.cash + self.position * new_price
 
        # 奖励 = 收益率 - 交易成本 - 持仓风险
        price_change = (new_price - current_price) / current_price
        position_return = price_change * (self.position * current_price) / old_value
        reward = position_return * 100
 
        # 交易成本惩罚
        if action in [1, 2, 3]:
            reward -= 0.1
 
        # 回撤惩罚
        peak = max(self.equity_curve) if self.equity_curve else self.initial_cash
        drawdown = (peak - new_value) / peak
        reward -= 0.5 * max(0, drawdown - 0.05)  # 回撤超过5%时惩罚
 
        # 记录
        self.equity_curve.append(new_value)
 
        info = {
            "total_value": new_value,
            "return": (new_value - self.initial_cash) / self.initial_cash,
            "position": self.position,
            "price": new_price,
        }
 
        return self._get_obs(), reward, terminated, truncated, info
 
    def _execute_action(self, action, price):
        """执行交易动作"""
        if action == 1:  # buy
            # 买入 20% 的现金
            buy_value = self.cash * 0.2
            if buy_value > price:
                # 计算滑点
                execution_price = price * (1 + self.slippage)
                shares = int(buy_value / execution_price)
                cost = shares * execution_price * (1 + self.commission)
                if cost <= self.cash:
                    self.position += shares
                    self.cash -= cost
                    self.trade_log.append({
                        "step": self.current_step,
                        "action": "buy",
                        "shares": shares,
                        "price": execution_price,
                    })
 
        elif action == 2:  # sell
            # 卖出 20% 的持仓
            if self.position > 0:
                shares = int(self.position * 0.2)
                if shares > 0:
                    execution_price = price * (1 - self.slippage)
                    revenue = shares * execution_price * (1 - self.commission)
                    self.position -= shares
                    self.cash += revenue
                    self.trade_log.append({
                        "step": self.current_step,
                        "action": "sell",
                        "shares": shares,
                        "price": execution_price,
                    })
 
        elif action == 3:  # close
            if self.position > 0:
                execution_price = price * (1 - self.slippage)
                revenue = self.position * execution_price * (1 - self.commission)
                self.cash += revenue
                self.trade_log.append({
                    "step": self.current_step,
                    "action": "close",
                    "shares": self.position,
                    "price": execution_price,
                })
                self.position = 0
            elif self.position < 0:
                execution_price = price * (1 + self.slippage)
                cost = -self.position * execution_price * (1 + self.commission)
                self.cash -= cost
                self.trade_log.append({
                    "step": self.current_step,
                    "action": "close",
                    "shares": -self.position,
                    "price": execution_price,
                })
                self.position = 0
 
 
# 生成完整的回测数据
def generate_full_backtest_data(n_days=5000):
    """生成更真实的模拟数据"""
    np.random.seed(42)
 
    # 生成带趋势和波动率变化的价格
    dt = 1/252
    prices = [100]
    volatility = 0.15
 
    for i in range(n_days - 1):
        # 随机波动率
        volatility = 0.1 + 0.1 * abs(np.sin(i / 500))
 
        # 趋势变化
        trend = 0.03 * np.sin(i / 1000)
 
        returns = np.random.normal(trend * dt, volatility * np.sqrt(dt))
        prices.append(prices[-1] * (1 + returns))
 
    df = pd.DataFrame({
        "close": prices,
        "high": [p * (1 + abs(np.random.normal(0, 0.005))) for p in prices],
        "low": [p * (1 - abs(np.random.normal(0, 0.005))) for p in prices],
        "volume": np.random.randint(1000000, 10000000, n_days),
    })
 
    return df
 
 
# 完整训练和评估流程
class TradingCallback(BaseCallback):
    """训练回调"""
 
    def __init__(self, verbose=0):
        super().__init__(verbose)
        self.episode_rewards = []
 
    def _on_step(self):
        return True
 
 
def run_full_backtest():
    """运行完整回测"""
 
    # 1. 生成数据
    print("生成数据...")
    df = generate_full_backtest_data(5000)
 
    # 划分训练/测试集
    train_size = int(0.7 * len(df))
    train_df = df[:train_size]
    test_df = df[train_size:]
 
    print(f"训练集: {len(train_df)} 天")
    print(f"测试集: {len(test_df)} 天")
 
    # 2. 创建环境
    train_env = FullBacktestEnv(train_df)
    test_env = FullBacktestEnv(test_df)
 
    # 3. 训练 PPO
    print("\n训练 PPO 模型...")
    model = PPO(
        "MlpPolicy",
        train_env,
        learning_rate=3e-4,
        n_steps=2048,
        batch_size=64,
        n_epochs=10,
        gamma=0.99,
        verbose=1,
    )
 
    model.learn(total_timesteps=100000, callback=TradingCallback())
 
    # 4. 评估
    print("\n评估模型...")
    obs, info = test_env.reset()
    done, truncated = False, False
 
    test_equity = []
    test_actions = []
 
    while not (done or truncated):
        action, _ = model.predict(obs, deterministic=True)
        obs, reward, done, truncated, info = test_env.step(action)
        test_equity.append(info["total_value"])
        test_actions.append(action)
 
    # 5. 计算指标
    returns = np.diff(test_equity) / test_equity[:-1]
    sharpe = np.mean(returns) / np.std(returns) * np.sqrt(252) if np.std(returns) > 0 else 0
    max_drawdown = (max(test_equity) - min(test_equity)) / max(test_equity)
    total_return = (test_equity[-1] - test_equity[0]) / test_equity[0]
 
    print(f"\n=== 回测结果 ===")
    print(f"总收益率: {total_return:.2%}")
    print(f"夏普比率: {sharpe:.2f}")
    print(f"最大回撤: {max_drawdown:.2%}")
    print(f"最终净值: {test_equity[-1]:.2f}")
 
    # 6. 与基准比较
    # 买入持有
    buy_hold_return = (test_df["close"].iloc[-1] - test_df["close"].iloc[0]) / test_df["close"].iloc[0]
    print(f"\n买入持有收益率: {buy_hold_return:.2%}")
 
    # 7. 可视化
    plt.figure(figsize=(12, 6))
    plt.plot(test_equity, label="PPO Strategy")
    plt.plot(
        [test_env.initial_cash * (1 + buy_hold_return * (i / len(test_equity))) for i in range(len(test_equity))],
        label="Buy & Hold",
        linestyle="--"
    )
    plt.xlabel("Days")
    plt.ylabel("Portfolio Value")
    plt.title("Trading Strategy Backtest")
    plt.legend()
    plt.grid(True)
    plt.savefig("./rl_backtest_results.png")
    print("\n图表已保存到 ./rl_backtest_results.png")
 
    return model, test_equity
 
 
if __name__ == "__main__":
    model, equity = run_full_backtest()

风险管理与 RL

带风险约束的 RL

class RiskConstrainedEnv(gym.Env):
    """
    风险约束的交易环境
 
    特点:
    1. 回撤限制
    2. 波动率限制
    3. 持仓集中度限制
    """
 
    def __init__(self, df, max_drawdown=0.15, max_volatility=0.2):
        super().__init__()
        self.df = df
        self.max_drawdown = max_drawdown
        self.max_volatility = max_volatility
 
        # 其他初始化...
 
    def step(self, action):
        # 执行动作...
 
        # 检查风险限制
        current_drawdown = self._compute_drawdown()
        current_volatility = self._compute_volatility()
 
        # 如果超过限制,强制减仓
        if current_drawdown > self.max_drawdown:
            action = 3  # 强制平仓
            reward = -10  # 大惩罚
 
        if current_volatility > self.max_volatility:
            action = 0  # 持有,不加仓
            reward -= 1  # 惩罚
 
        return obs, reward, done, truncated, info
 
    def _compute_drawdown(self):
        """计算当前回撤"""
        peak = max(self.equity_curve) if self.equity_curve else self.initial_cash
        current = self.equity_curve[-1] if self.equity_curve else self.initial_cash
        return (peak - current) / peak
 
    def _compute_volatility(self, window=20):
        """计算滚动波动率"""
        if len(self.equity_curve) < window:
            return 0
        returns = np.diff(self.equity_curve[-window:]) / self.equity_curve[-window:-1]
        return np.std(returns) * np.sqrt(252)

动态风险调整

class DynamicRiskAdjustment:
    """
    根据市场状态动态调整风险参数
 
    方法:
    1. VIX 类指标
    2. 实现波动率
    3. 尾部风险
    """
 
    def __init__(self):
        self.base_risk = 1.0
 
    def get_risk_multiplier(self, market_state):
        """
        根据市场状态返回风险乘数
 
        返回值范围:[0.5, 1.5]
        """
        volatility = market_state.get("volatility", 0.15)
        trend = market_state.get("trend", 0)
 
        # 低波动率 + 明确趋势:增加风险
        if volatility < 0.12 and abs(trend) > 0.05:
            return min(1.5, self.base_risk * 1.5)
 
        # 高波动率:降低风险
        if volatility > 0.25:
            return max(0.5, self.base_risk * 0.5)
 
        # 正常情况
        return self.base_risk
 
    def adjust_position_size(self, base_size, market_state):
        """调整仓位大小"""
        risk_mult = self.get_risk_multiplier(market_state)
        return base_size * risk_mult

总结与最佳实践

RL 交易检查清单

□ 数据质量
  □ 覆盖多种市场环境
  □ 足够的历史数据
  □ 包含必要的特征

□ 环境设计
  □ 状态空间合理
  □ 动作空间符合交易逻辑
  □ 奖励函数与目标一致
  □ 考虑交易成本和滑点

□ 训练流程
  □ 时间序列切分
  □ 验证集监控
  □ 多次随机种子
  □ 早停机制

□ 风险管理
  □ 回撤限制
  □ 持仓上限
  □ 波动率控制
  □ 流动性约束

□ 评估方法
  □ 样本外测试
  □ 滚动窗口验证
  □ 与基准比较
  □ 压力测试

常见问题与解决方案

问题解决方案
训练不稳定1. 降低学习率 2. 使用归一化 3. 增加批大小
过拟合1. 增加数据多样性 2. 使用正则化 3. 早停
样本外差1. 更保守的策略 2. 集成方法 3. 在线微调
奖励黑客1. 仔细设计奖励函数 2. 使用奖励归一化
训练慢1. 使用向量化环境 2. 增加并行度 3. 使用 GPU

推荐学习路径

  1. 基础(1-2周)

    • 完成 Part 1 的理论和代码
    • 运行简单的 DQN/PPO 示例
  2. 实践(2-4周)

    • 实现自己的交易环境
    • 训练第一个交易智能体
    • 完成完整回测
  3. 进阶(1-2月)

    • 研究 Offline RL
    • 尝试 Multi-Agent
    • 优化风险管理
  4. 实战(持续)

    • 纸上交易验证
    • 小资金试运行
    • 持续监控和调整

延伸阅读

  • 论文

    • “Deep Reinforcement Learning for Trading” (Financial Times)
    • “Offline Reinforcement Learning” (Levine et al.)
    • “Multi-Agent Reinforcement Learning” (Tampuu et al.)
  • 代码库

    • stable-baselines3
    • d3rlpy
    • rl-transer
  • 社区

    • r/reinforcementlearning
    • Quantopian(已关闭,但论坛有历史资料)
    • Kaggle 竞赛

恭喜! 你已经完成了强化学习交易模块的学习。记住,RL 在量化交易中是一个强大的工具,但需要谨慎使用。从简单开始,逐步迭代,始终重视风险管理。