高级主题与实战案例
Multi-Agent、多资产组合与 Offline RL:深入探索 RL 交易的高级应用
本文介绍强化学习在量化交易中的高级主题,包括 Multi-Agent RL、多资产组合优化、Offline RL 以及完整的回测案例。
Multi-Agent RL:多智能体强化学习
Multi-Agent RL(MARL)研究多个智能体如何协同或竞争。在量化交易中,这可以用于:
- 多个资产/策略的协同管理
- 市场微观结构建模(买方/卖方博弈)
- 组合风险管理
MARL 的核心概念
1. 合作 vs 竞争
- 合作 MARL:所有智能体共享奖励,目标一致
- 应用:多资产协同交易、分布式执行
- 竞争 MARL:智能体之间零和博弈
- 应用:对手盘建模、市场模拟
- 混合 MARL:既有合作又有竞争
- 应用:部分信息博弈
2. 状态与动作空间
在 MARL 中,每个智能体都有自己的观察和动作:
全局状态 s^t 包含所有信息,但每个智能体只能看到部分观察。
3. 奖励分配
如何将全局奖励分配给各个智能体是 MARL 的关键问题:
- 全局奖励:所有智能体共享同一个奖励
- 局部奖励:每个智能体有自己的奖励
- 差异化奖励:考虑个体贡献的奖励分配
简单的 Multi-Agent 交易环境
import numpy as np
import gymnasium as gym
from gymnasium import spaces
class MultiAssetTradingEnv(gym.Env):
"""
多资产交易环境(合作 MARL)
每个智能体负责管理一个资产
目标:最大化整个组合的 Sharpe 比率
"""
def __init__(self, price_data, n_agents=5):
"""
参数:
price_data: dict, {asset_name: price_series}
n_agents: 智能体数量(每个管理一个资产)
"""
super().__init__()
self.price_data = price_data
self.asset_names = list(price_data.keys())[:n_agents]
self.n_agents = n_agents
# 每个智能体的动作空间:离散 [hold, buy, sell]
self.action_space = spaces.MultiDiscrete([3] * n_agents)
# 观察空间:每个智能体看到自己资产的价格特征
self.observation_space = spaces.Tuple([
spaces.Box(low=-np.inf, high=np.inf, shape=(20,), dtype=np.float32)
for _ in range(n_agents)
])
# 环境状态
self.current_step = 0
self.positions = np.zeros(n_agents)
self.cash = 10000.0
self.initial_cash = 10000.0
def reset(self, seed=None):
super().reset(seed=seed)
self.current_step = 20 # 需要足够的历史数据
self.positions = np.zeros(self.n_agents)
self.cash = self.initial_cash
return self._get_obs(), {}
def _get_obs(self):
"""获取每个智能体的观察"""
observations = []
for i, asset in enumerate(self.asset_names):
prices = self.price_data[asset].values
# 每个智能体只看到自己资产的价格特征
window_prices = prices[self.current_step - 20:self.current_step]
returns = np.diff(window_prices) / window_prices[:-1]
obs = np.concatenate([
returns, # 19 个收益率
[self.positions[i] / 100], # 自己的持仓
])
observations.append(obs.astype(np.float32))
return tuple(observations)
def step(self, actions):
"""
执行动作
参数:
actions: 长度为 n_agents 的数组,每个是 0/1/2
"""
old_value = self._get_portfolio_value()
# 每个智能体执行自己的动作
for i, action in enumerate(actions):
asset = self.asset_names[i]
price = self.price_data[asset].values[self.current_step]
if action == 1: # buy
amount = self.cash * 0.1 / self.n_agents # 分散资金
if amount > price:
shares = int(amount / price)
self.positions[i] += shares
self.cash -= shares * price * 1.001 # 手续费
elif action == 2: # sell
if self.positions[i] > 0:
shares = int(self.positions[i] * 0.5)
self.cash += shares * price * 0.999
self.positions[i] -= shares
# 推进时间
self.current_step += 1
# 计算奖励(全局 Sharpe)
new_value = self._get_portfolio_value()
portfolio_return = (new_value - old_value) / old_value
# 奖励 = 组合收益 - 分散度惩罚(鼓励分散投资)
reward = portfolio_return * 100
# 惩罚过度集中
position_values = np.array([
self.positions[i] * self.price_data[self.asset_names[i]].values[self.current_step]
for i in range(self.n_agents)
])
if np.sum(position_values) > 0:
concentration = np.max(np.abs(position_values)) / np.sum(position_values)
reward -= 0.1 * (concentration - 0.5) # 鼓励分散
done = self.current_step >= len(next(iter(self.price_data.values()))) - 1
return self._get_obs(), reward, done, False, {
"portfolio_value": new_value,
"return": (new_value - self.initial_cash) / self.initial_cash,
}
def _get_portfolio_value(self):
"""计算组合价值"""
value = self.cash
for i, asset in enumerate(self.asset_names):
price = self.price_data[asset].values[self.current_step]
value += self.positions[i] * price
return value
# 使用示例
if __name__ == "__main__":
import pandas as pd
# 生成多个资产的价格数据
np.random.seed(42)
n_days = 1000
price_data = {}
for i in range(5):
returns = np.random.normal(0.0001, 0.02, n_days)
prices = 100 * np.exp(np.cumsum(returns))
price_data[f"asset_{i}"] = pd.Series(prices)
# 创建环境
env = MultiAssetTradingEnv(price_data, n_agents=5)
# 测试
obs, info = env.reset()
print(f"观察空间: {len(obs)} 个智能体,每个 {obs[0].shape}")
for _ in range(10):
actions = [env.action_space[i].sample() for i in range(env.n_agents)]
obs, reward, done, truncated, info = env.step(actions)
print(f"Reward: {reward:.4f}, Portfolio Value: {info['portfolio_value']:.2f}")
if done:
breakMAPPO:Multi-Agent PPO
MAPPO 是 PPO 在多智能体场景的扩展,简单有效。
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Categorical
class MAPPOAgent:
"""
简化的 MAPPO 实现
核心思想:
1. 每个 agent 有自己的 actor
2. 共享一个 critic(估计全局价值)
3. 使用中心化的价值函数训练
"""
def __init__(self, n_agents, obs_dim, action_dim, hidden_dim=64):
self.n_agents = n_agents
self.obs_dim = obs_dim
self.action_dim = action_dim
# 每个 agent 的 actor
self.actors = nn.ModuleList([
nn.Sequential(
nn.Linear(obs_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim),
nn.Softmax(dim=-1)
)
for _ in range(n_agents)
])
# 共享的 critic(输入所有 agent 的观察)
self.critic = nn.Sequential(
nn.Linear(obs_dim * n_agents, hidden_dim * 2),
nn.ReLU(),
nn.Linear(hidden_dim * 2, hidden_dim * 2),
nn.ReLU(),
nn.Linear(hidden_dim * 2, 1)
)
self.optimizer = torch.optim.Adam(
list(self.actors.parameters()) + list(self.critic.parameters()),
lr=3e-4
)
def select_actions(self, observations):
"""为所有 agent 选择动作"""
actions = []
log_probs = []
for i, obs in enumerate(observations):
obs_tensor = torch.FloatTensor(obs)
action_probs = self.actors[i](obs_tensor)
dist = Categorical(action_probs)
action = dist.sample()
actions.append(action.item())
log_probs.append(dist.log_prob(action))
return actions, torch.stack(log_probs)
def get_value(self, observations):
"""计算全局价值"""
# 拼接所有观察
concat_obs = torch.cat([torch.FloatTensor(obs) for obs in observations])
return self.critic(concat_obs)
def update(self, trajectories, gamma=0.99, epsilon=0.2):
"""
更新网络
参数:
trajectories: list of (obs, actions, log_probs, rewards, next_obs, dones)
"""
# 计算 returns 和 advantages
returns = []
advantages = []
for traj in trajectories:
obs_list, actions_list, old_log_probs, rewards, next_obs_list, dones = zip(*traj)
# 计算 returns
R = 0
episode_returns = []
for r, d in zip(reversed(rewards), reversed(dones)):
R = r + gamma * R * (1 - d)
episode_returns.insert(0, R)
returns.extend(episode_returns)
# 计算 advantages(简单版本:使用 returns - value)
episode_advantages = []
for i, (obs, ret) in enumerate(zip(obs_list, episode_returns)):
value = self.get_value(obs)
advantage = ret - value.item()
episode_advantages.append(advantage)
advantages.extend(episode_advantages)
returns = torch.FloatTensor(returns)
advantages = torch.FloatTensor(advantages)
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
# PPO 更新
for _ in range(10): # PPO epochs
# 重新采样(简化:使用全部数据)
batch_indices = np.random.permutation(len(trajectories))
for idx in batch_indices:
obs_list, actions_list, old_log_probs, _, _, _ = zip(*trajectories[idx])
for agent_idx in range(self.n_agents):
obs = torch.FloatTensor(obs_list[agent_idx])
action = torch.LongTensor([actions_list[agent_idx]])
old_log_prob = old_log_probs[agent_idx]
# 新的策略
action_probs = self.actors[agent_idx](obs)
dist = Categorical(action_probs)
new_log_prob = dist.log_prob(action)
# Ratio
ratio = torch.exp(new_log_prob - old_log_prob)
# PPO clip
advantage = advantages[idx]
surr1 = ratio * advantage
surr2 = torch.clamp(ratio, 1 - epsilon, 1 + epsilon) * advantage
actor_loss = -torch.min(surr1, surr2)
# Critic loss
concat_obs = torch.cat([torch.FloatTensor(o) for o in obs_list])
value = self.critic(concat_obs)
critic_loss = F.mse_loss(value, returns[idx:idx+1])
# 总损失
loss = actor_loss + 0.5 * critic_loss
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# 简化的训练循环
if __name__ == "__main__":
import pandas as pd
# 生成数据
np.random.seed(42)
n_days = 1000
price_data = {}
for i in range(5):
returns = np.random.normal(0.0001, 0.02, n_days)
prices = 100 * np.exp(np.cumsum(returns))
price_data[f"asset_{i}"] = pd.Series(prices)
env = MultiAssetTradingEnv(price_data, n_agents=5)
# 创建 MAPPO agent
agent = MAPPOAgent(
n_agents=5,
obs_dim=20,
action_dim=3
)
# 训练
n_episodes = 100
for episode in range(n_episodes):
obs, _ = env.reset()
episode_rewards = []
trajectory = []
for step in range(100):
# 选择动作
actions, log_probs = agent.select_actions(obs)
# 执行
next_obs, reward, done, truncated, info = env.step(actions)
episode_rewards.append(reward)
# 存储轨迹
trajectory.append((obs, actions, log_probs, reward, next_obs, done))
obs = next_obs
if done:
break
# 定期更新
if len(trajectory) > 0:
agent.update([trajectory])
if episode % 10 == 0:
print(f"Episode {episode}, Total Reward: {sum(episode_rewards):.2f}, "
f"Return: {info['return']:.2%}")多资产组合优化
RL 可以用于动态资产配置,根据市场状态调整组合权重。
Black-Litterman 与 RL 结合
class DynamicAssetAllocationEnv(gym.Env):
"""
动态资产配置环境
状态:市场特征(风险、相关性、动量)
动作:各资产的权重 [w1, w2, ..., wn]
奖励:组合收益 - 风险惩罚
"""
def __init__(self, returns_data, window=252):
super().__init__()
self.returns = returns_data
self.n_assets = returns_data.shape[1]
self.window = window
# 状态:市场特征
n_features = self.n_assets * 3 # 收益、波动、相关系数
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf, shape=(n_features,), dtype=np.float32
)
# 动作:资产权重(使用 softmax 确保和为 1)
self.action_space = spaces.Box(
low=-np.inf, high=np.inf, shape=(self.n_assets,), dtype=np.float32
)
self.current_step = window
self.weights = np.ones(self.n_assets) / self.n_assets
def reset(self, seed=None):
super().reset(seed=seed)
self.current_step = self.window
self.weights = np.ones(self.n_assets) / self.n_assets
return self._get_obs(), {}
def _get_obs(self):
"""获取市场特征"""
window_returns = self.returns[self.current_step - self.window:self.current_step]
# 各资产的平均收益和波动
mean_returns = window_returns.mean(axis=0).values
std_returns = window_returns.std(axis=0).values
# 相关性矩阵(取上三角)
corr_matrix = window_returns.corr().values
corr_features = corr_matrix[np.triu_indices(self.n_assets, 1)]
features = np.concatenate([mean_returns, std_returns, corr_features])
return features.astype(np.float32)
def step(self, action):
"""
执行动作
参数:
action: 原始权重(需要转换)
"""
# 将 action 转换为有效权重(softmax)
weights = np.exp(action)
weights = weights / np.sum(weights)
self.weights = weights
# 获取下一天的收益
actual_returns = self.returns.iloc[self.current_step].values
portfolio_return = np.dot(weights, actual_returns)
# 奖励 = 收益 - 风险惩罚
# 使用滚动窗口的波动率作为风险度量
window_returns = self.returns[self.current_step - self.window:self.current_step]
portfolio_volatility = np.sqrt(
np.dot(weights.T, np.dot(window_returns.cov().values * 252, weights))
)
reward = portfolio_return * 100 - 0.5 * portfolio_volatility
self.current_step += 1
done = self.current_step >= len(self.returns) - 1
return self._get_obs(), reward, done, False, {
"weights": weights,
"return": portfolio_return,
"volatility": portfolio_volatility,
}
# 使用 SAC 训练资产配置
if __name__ == "__main__":
from stable_baselines3 import SAC
# 生成多资产收益数据
np.random.seed(42)
n_days = 2000
n_assets = 5
# 生成相关的收益
mean_returns = np.random.normal(0.0001, 0.0001, n_assets)
cov_matrix = np.random.uniform(0.0001, 0.001, (n_assets, n_assets))
cov_matrix = (cov_matrix + cov_matrix.T) / 2 # 对称
np.fill_diagonal(cov_matrix, 0.001) # 对角线
returns = np.random.multivariate_normal(mean_returns, cov_matrix, n_days)
returns_df = pd.DataFrame(returns, columns=[f"asset_{i}" for i in range(n_assets)])
# 创建环境
env = DynamicAssetAllocationEnv(returns_df)
# 训练 SAC(连续动作)
model = SAC(
"MlpPolicy",
env,
learning_rate=3e-4,
buffer_size=100000,
learning_starts=1000,
batch_size=256,
gamma=0.99,
verbose=1,
)
print("开始训练资产配置模型...")
model.learn(total_timesteps=50000)
# 评估
obs, _ = env.reset()
weights_history = []
for _ in range(100):
action, _ = model.predict(obs, deterministic=True)
obs, reward, done, truncated, info = env.step(action)
weights_history.append(info["weights"])
if done:
break
print(f"\n平均权重: {np.mean(weights_history, axis=0)}")
print(f"权重标准差: {np.std(weights_history, axis=0)}")Hierarchical RL:分层强化学习
分层 RL 将决策问题分解为不同层次,适合复杂的交易场景。
class HierarchicalTradingAgent:
"""
分层交易智能体
高层(Manager):选择策略模式(趋势/均值回归/波动)
低层(Worker):执行具体交易动作
"""
def __init__(self, state_dim, n_modes=3):
# Manager:选择策略模式
self.manager = nn.Sequential(
nn.Linear(state_dim, 64),
nn.ReLU(),
nn.Linear(64, n_modes),
nn.Softmax(dim=-1)
)
# Workers:每个模式一个 worker
self.workers = nn.ModuleList([
nn.Sequential(
nn.Linear(state_dim + 1, 64), # +1 是 mode one-hot
nn.ReLU(),
nn.Linear(64, 3), # buy/sell/hold
nn.Softmax(dim=-1)
)
for _ in range(n_modes)
])
def select_mode(self, state):
"""高层选择策略模式"""
state_tensor = torch.FloatTensor(state)
mode_probs = self.manager(state_tensor)
mode = torch.distributions.Categorical(mode_probs).sample()
return mode.item()
def select_action(self, state, mode):
"""低层执行具体动作"""
mode_onehot = F.one_hot(torch.tensor(mode), num_classes=3).float()
state_tensor = torch.FloatTensor(state)
input_vec = torch.cat([state_tensor, mode_onehot])
action_probs = self.workers[mode](input_vec)
action = torch.distributions.Categorical(action_probs).sample()
return action.item()
def forward(self, state):
"""完整的决策过程"""
mode = self.select_mode(state)
action = self.select_action(state, mode)
return mode, action
# 策略模式定义
TRADING_MODES = {
0: "trend_following", # 趋势跟踪
1: "mean_reversion", # 均值回归
2: "volatility_breakout", # 波动突破
}Offline RL 实战
Offline RL 使用固定数据集训练,不需要与环境交互,非常适合量化交易。
CQL:Conservative Q-Learning
CQL 通过在 Q 值更新中加入保守惩罚,避免对未见过的状态-动作对过度乐观。
"""
Offline RL 实战:使用 d3rlpy
d3rlpy 是一个专门的 Offline RL 库,实现了 CQL、BCQ 等算法。
"""
# 安装:pip install d3rlpy
import d3rlpy
import numpy as np
import pandas as pd
def prepare_offline_dataset(price_data, signals, actions):
"""
准备 Offline RL 数据集
参数:
price_data: 价格数据
signals: 历史信号(可以是监督学习的输出)
actions: 历史动作
返回:
d3rlpy MDPDataset
"""
n_samples = len(price_data)
# 构建 observations
observations = []
for i in range(20, n_samples):
# 价格特征
window_prices = price_data[i-20:i]
returns = np.diff(window_prices) / window_prices[:-1]
# 技术指标
sma_short = window_prices[-5:].mean()
sma_long = window_prices[-20:].mean()
momentum = (window_prices[-1] - window_prices[0]) / window_prices[0]
obs = np.concatenate([
returns,
[sma_short / sma_long, momentum]
])
observations.append(obs)
observations = np.array(observations)
# 构建 actions(历史交易动作)
# 这里简化,实际应该从历史交易记录提取
actions = np.array(actions[20:n_samples])
# 构建 rewards
rewards = price_data.pct_change()[20:n_samples].values * 100
# terminals(episode 结束标记)
terminals = np.zeros(n_samples - 20, dtype=bool)
terminals[-1] = True # 最后一天
# 创建 d3rlpy 数据集
dataset = d3rlpy.dataset.MDPDataset(
observations=observations,
actions=actions,
rewards=rewards,
terminals=terminals,
)
return dataset
def train_cql_model(dataset, experiment_name="cql_trading"):
"""
训练 CQL 模型
参数:
dataset: d3rlpy MDPDataset
experiment_name: 实验名称
"""
# 创建 CQL 算法
cql = d3rlpy.algos.CQL(
learning_rate=1e-4,
batch_size=256,
alpha=1.0, # 保守程度
use_gpu=True,
)
# 设置日志
cql.create_experiment_dir(
name=experiment_name,
logdir="./d3rlpy_logs/"
)
# 离线训练
cql.fit(
dataset,
n_steps=100000,
n_steps_per_epoch=1000,
save_interval=10,
)
return cql
def evaluate_cql_model(model, test_data):
"""评估 CQL 模型"""
# 准备测试数据
test_dataset = prepare_offline_dataset(
test_data["close"],
test_data.get("signals", np.zeros(len(test_data))),
np.zeros(len(test_data)) # 占位
)
# 评估
evaluation = cql.evaluate(test_dataset)
print(f"CQL 模型评估结果:")
print(f"平均奖励: {evaluation['mean_reward']:.4f}")
print(f"标准差: {evaluation['std_reward']:.4f}")
# 示例使用
if __name__ == "__main__":
# 生成数据
np.random.seed(42)
n_days = 2000
prices = 100 * np.exp(np.cumsum(np.random.normal(0.0001, 0.02, n_days)))
price_df = pd.DataFrame({"close": prices})
# 模拟历史动作(随机 + 一些简单的策略)
# 实际中应该从真实交易记录提取
historical_actions = np.random.randint(0, 3, n_days)
# 准备数据集
train_df = price_df[:1500]
test_df = price_df[1500:]
train_dataset = prepare_offline_dataset(
train_df["close"],
np.zeros(len(train_df)),
historical_actions[:1500]
)
# 训练 CQL
print("训练 CQL 模型...")
cql = train_cql_model(train_dataset)
# 评估
print("\n评估 CQL 模型...")
evaluate_cql_model(cql, test_df)BCQ:Batch Constrained Q-learning
BCQ 限制动作选择范围,只选择接近数据集的动作,减少分布偏移。
"""
BCQ 实现示例(简化版)
完整的 BCQ 实现较复杂,建议使用 d3rlpy 或 rl-transer
"""
class BCQGenerator(nn.Module):
"""
BCQ 生成器网络
生成接近行为策略的动作
"""
def __init__(self, state_dim, action_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, action_dim),
)
def forward(self, state):
return torch.tanh(self.net(state))
class BCQAgent:
"""
BCQ 智能体
核心思想:
1. 训练一个 VAE 生成接近行为策略的动作
2. 在生成的动作中选择 Q 值最高的
"""
def __init__(self, state_dim, action_dim):
self.state_dim = state_dim
self.action_dim = action_dim
# 生成器(VAE 简化版)
self.generator = BCQGenerator(state_dim, action_dim)
# Q 网络
self.q_network = nn.Sequential(
nn.Linear(state_dim + action_dim, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, 1)
)
# Perturbation 网络(微调生成的动作)
self.perturbation = nn.Sequential(
nn.Linear(state_dim + action_dim, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, action_dim),
nn.Tanh()
)
def select_action(self, state, n_candidates=10):
"""
选择动作
参数:
state: 当前状态
n_candidates: 生成的候选动作数量
返回:
最佳动作
"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
# 生成候选动作
with torch.no_grad():
actions = self.generator(state_tensor.repeat(n_candidates, 1))
# 微调
perturbations = self.perturbation(
torch.cat([state_tensor.repeat(n_candidates, 1), actions], dim=1)
)
actions = torch.clamp(actions + 0.05 * perturbations, -1, 1)
# 选择 Q 值最高的
q_values = self.q_network(
torch.cat([state_tensor.repeat(n_candidates, 1), actions], dim=1)
)
best_idx = q_values.argmax()
return actions[best_idx].squeeze().numpy()完整回测案例
案例 1:单资产 PPO 交易
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import pandas as pd
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback
import matplotlib.pyplot as plt
class FullBacktestEnv(gym.Env):
"""
完整回测环境
特点:
1. 考虑交易成本和滑点
2. 支持多种技术指标
3. 详细的状态和日志
"""
def __init__(
self,
df: pd.DataFrame,
initial_cash: float = 100000,
commission: float = 0.001,
slippage: float = 0.0001,
window_size: int = 60,
):
super().__init__()
self.df = df.reset_index(drop=True)
self.initial_cash = initial_cash
self.commission = commission
self.slippage = slippage
self.window_size = window_size
# 动作:0=hold, 1=buy, 2=sell, 3=close
self.action_space = spaces.Discrete(4)
# 状态:技术指标 + 账户状态
n_features = self._compute_n_features()
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf, shape=(n_features,), dtype=np.float32
)
# 记录
self.trade_log = []
self.equity_curve = []
def _compute_n_features(self):
"""计算特征数量"""
# 价格特征
price_features = self.window_size # 收益率
# 技术指标
indicator_features = 5 # RSI, MACD, 等
# 账户特征
account_features = 4 # 持仓、现金、净值、收益率
return price_features + indicator_features + account_features
def reset(self, seed=None):
super().reset(seed=seed)
self.current_step = self.window_size
self.position = 0
self.cash = self.initial_cash
self.trade_log = []
self.equity_curve = [self.initial_cash]
return self._get_obs(), {}
def _get_obs(self):
"""构建观察向量"""
if self.current_step < self.window_size:
return np.zeros(self._compute_n_features())
# 1. 价格特征:过去 window_size 天的收益率
prices = self.df["close"].values
window_prices = prices[self.current_step - self.window_size:self.current_step]
returns = np.diff(window_prices) / window_prices[:-1]
price_features = np.concatenate([[0], returns])
# 2. 技术指标
rsi = self._compute_rsi(prices, self.current_step)
macd, signal = self._compute_macd(prices, self.current_step)
bb_upper, bb_lower = self._compute_bollinger(prices, self.current_step)
atr = self._compute_atr(prices, self.current_step)
current_price = prices[self.current_step]
indicator_features = np.array([
rsi / 100, # 归一化
(current_price - bb_lower) / (bb_upper - bb_lower + 1e-8),
macd / current_price,
signal / current_price,
atr / current_price,
])
# 3. 账户特征
total_value = self.cash + self.position * current_price
account_features = np.array([
self.position / self.initial_cash, # 归一化持仓
self.cash / self.initial_cash, # 归一化现金
total_value / self.initial_cash, # 归一化净值
(total_value - self.initial_cash) / self.initial_cash, # 收益率
])
obs = np.concatenate([price_features, indicator_features, account_features])
return obs.astype(np.float32)
def _compute_rsi(self, prices, idx, period=14):
"""计算 RSI"""
if idx < period:
return 50
window = prices[idx-period:idx+1]
deltas = np.diff(window)
gains = np.where(deltas > 0, deltas, 0)
losses = np.where(deltas < 0, -deltas, 0)
avg_gain = np.mean(gains)
avg_loss = np.mean(losses)
if avg_loss == 0:
return 100
rs = avg_gain / avg_loss
return 100 - (100 / (1 + rs))
def _compute_macd(self, prices, idx, fast=12, slow=26, signal=9):
"""计算 MACD"""
if idx < slow:
return 0, 0
window = prices[idx-slow:idx+1]
ema_fast = pd.Series(window).ewm(span=fast).mean().iloc[-1]
ema_slow = pd.Series(window).ewm(span=slow).mean().iloc[-1]
macd = ema_fast - ema_slow
return macd, macd * 0.9 # 简化的信号线
def _compute_bollinger(self, prices, idx, period=20, std=2):
"""计算布林带"""
if idx < period:
return prices[idx] * 1.02, prices[idx] * 0.98
window = prices[idx-period:idx+1]
sma = np.mean(window)
std_dev = np.std(window)
return sma + std * std_dev, sma - std * std_dev
def _compute_atr(self, prices, idx, period=14):
"""计算 ATR"""
if idx < period:
return prices[idx] * 0.02
high = self.df["high"].values[idx-period:idx+1]
low = self.df["low"].values[idx-period:idx+1]
close = self.df["close"].values[idx-period:idx+1]
tr = np.maximum(
high[1:] - low[1:],
np.maximum(abs(high[1:] - close[:-1]), abs(low[1:] - close[:-1]))
)
return np.mean(tr)
def step(self, action):
"""执行一步"""
current_price = self.df["close"].values[self.current_step]
old_value = self.cash + self.position * current_price
# 执行动作
self._execute_action(action, current_price)
# 推进时间
self.current_step += 1
# 检查是否结束
terminated = self.current_step >= len(self.df) - 1
truncated = False
# 计算奖励
new_price = self.df["close"].values[self.current_step]
new_value = self.cash + self.position * new_price
# 奖励 = 收益率 - 交易成本 - 持仓风险
price_change = (new_price - current_price) / current_price
position_return = price_change * (self.position * current_price) / old_value
reward = position_return * 100
# 交易成本惩罚
if action in [1, 2, 3]:
reward -= 0.1
# 回撤惩罚
peak = max(self.equity_curve) if self.equity_curve else self.initial_cash
drawdown = (peak - new_value) / peak
reward -= 0.5 * max(0, drawdown - 0.05) # 回撤超过5%时惩罚
# 记录
self.equity_curve.append(new_value)
info = {
"total_value": new_value,
"return": (new_value - self.initial_cash) / self.initial_cash,
"position": self.position,
"price": new_price,
}
return self._get_obs(), reward, terminated, truncated, info
def _execute_action(self, action, price):
"""执行交易动作"""
if action == 1: # buy
# 买入 20% 的现金
buy_value = self.cash * 0.2
if buy_value > price:
# 计算滑点
execution_price = price * (1 + self.slippage)
shares = int(buy_value / execution_price)
cost = shares * execution_price * (1 + self.commission)
if cost <= self.cash:
self.position += shares
self.cash -= cost
self.trade_log.append({
"step": self.current_step,
"action": "buy",
"shares": shares,
"price": execution_price,
})
elif action == 2: # sell
# 卖出 20% 的持仓
if self.position > 0:
shares = int(self.position * 0.2)
if shares > 0:
execution_price = price * (1 - self.slippage)
revenue = shares * execution_price * (1 - self.commission)
self.position -= shares
self.cash += revenue
self.trade_log.append({
"step": self.current_step,
"action": "sell",
"shares": shares,
"price": execution_price,
})
elif action == 3: # close
if self.position > 0:
execution_price = price * (1 - self.slippage)
revenue = self.position * execution_price * (1 - self.commission)
self.cash += revenue
self.trade_log.append({
"step": self.current_step,
"action": "close",
"shares": self.position,
"price": execution_price,
})
self.position = 0
elif self.position < 0:
execution_price = price * (1 + self.slippage)
cost = -self.position * execution_price * (1 + self.commission)
self.cash -= cost
self.trade_log.append({
"step": self.current_step,
"action": "close",
"shares": -self.position,
"price": execution_price,
})
self.position = 0
# 生成完整的回测数据
def generate_full_backtest_data(n_days=5000):
"""生成更真实的模拟数据"""
np.random.seed(42)
# 生成带趋势和波动率变化的价格
dt = 1/252
prices = [100]
volatility = 0.15
for i in range(n_days - 1):
# 随机波动率
volatility = 0.1 + 0.1 * abs(np.sin(i / 500))
# 趋势变化
trend = 0.03 * np.sin(i / 1000)
returns = np.random.normal(trend * dt, volatility * np.sqrt(dt))
prices.append(prices[-1] * (1 + returns))
df = pd.DataFrame({
"close": prices,
"high": [p * (1 + abs(np.random.normal(0, 0.005))) for p in prices],
"low": [p * (1 - abs(np.random.normal(0, 0.005))) for p in prices],
"volume": np.random.randint(1000000, 10000000, n_days),
})
return df
# 完整训练和评估流程
class TradingCallback(BaseCallback):
"""训练回调"""
def __init__(self, verbose=0):
super().__init__(verbose)
self.episode_rewards = []
def _on_step(self):
return True
def run_full_backtest():
"""运行完整回测"""
# 1. 生成数据
print("生成数据...")
df = generate_full_backtest_data(5000)
# 划分训练/测试集
train_size = int(0.7 * len(df))
train_df = df[:train_size]
test_df = df[train_size:]
print(f"训练集: {len(train_df)} 天")
print(f"测试集: {len(test_df)} 天")
# 2. 创建环境
train_env = FullBacktestEnv(train_df)
test_env = FullBacktestEnv(test_df)
# 3. 训练 PPO
print("\n训练 PPO 模型...")
model = PPO(
"MlpPolicy",
train_env,
learning_rate=3e-4,
n_steps=2048,
batch_size=64,
n_epochs=10,
gamma=0.99,
verbose=1,
)
model.learn(total_timesteps=100000, callback=TradingCallback())
# 4. 评估
print("\n评估模型...")
obs, info = test_env.reset()
done, truncated = False, False
test_equity = []
test_actions = []
while not (done or truncated):
action, _ = model.predict(obs, deterministic=True)
obs, reward, done, truncated, info = test_env.step(action)
test_equity.append(info["total_value"])
test_actions.append(action)
# 5. 计算指标
returns = np.diff(test_equity) / test_equity[:-1]
sharpe = np.mean(returns) / np.std(returns) * np.sqrt(252) if np.std(returns) > 0 else 0
max_drawdown = (max(test_equity) - min(test_equity)) / max(test_equity)
total_return = (test_equity[-1] - test_equity[0]) / test_equity[0]
print(f"\n=== 回测结果 ===")
print(f"总收益率: {total_return:.2%}")
print(f"夏普比率: {sharpe:.2f}")
print(f"最大回撤: {max_drawdown:.2%}")
print(f"最终净值: {test_equity[-1]:.2f}")
# 6. 与基准比较
# 买入持有
buy_hold_return = (test_df["close"].iloc[-1] - test_df["close"].iloc[0]) / test_df["close"].iloc[0]
print(f"\n买入持有收益率: {buy_hold_return:.2%}")
# 7. 可视化
plt.figure(figsize=(12, 6))
plt.plot(test_equity, label="PPO Strategy")
plt.plot(
[test_env.initial_cash * (1 + buy_hold_return * (i / len(test_equity))) for i in range(len(test_equity))],
label="Buy & Hold",
linestyle="--"
)
plt.xlabel("Days")
plt.ylabel("Portfolio Value")
plt.title("Trading Strategy Backtest")
plt.legend()
plt.grid(True)
plt.savefig("./rl_backtest_results.png")
print("\n图表已保存到 ./rl_backtest_results.png")
return model, test_equity
if __name__ == "__main__":
model, equity = run_full_backtest()风险管理与 RL
带风险约束的 RL
class RiskConstrainedEnv(gym.Env):
"""
风险约束的交易环境
特点:
1. 回撤限制
2. 波动率限制
3. 持仓集中度限制
"""
def __init__(self, df, max_drawdown=0.15, max_volatility=0.2):
super().__init__()
self.df = df
self.max_drawdown = max_drawdown
self.max_volatility = max_volatility
# 其他初始化...
def step(self, action):
# 执行动作...
# 检查风险限制
current_drawdown = self._compute_drawdown()
current_volatility = self._compute_volatility()
# 如果超过限制,强制减仓
if current_drawdown > self.max_drawdown:
action = 3 # 强制平仓
reward = -10 # 大惩罚
if current_volatility > self.max_volatility:
action = 0 # 持有,不加仓
reward -= 1 # 惩罚
return obs, reward, done, truncated, info
def _compute_drawdown(self):
"""计算当前回撤"""
peak = max(self.equity_curve) if self.equity_curve else self.initial_cash
current = self.equity_curve[-1] if self.equity_curve else self.initial_cash
return (peak - current) / peak
def _compute_volatility(self, window=20):
"""计算滚动波动率"""
if len(self.equity_curve) < window:
return 0
returns = np.diff(self.equity_curve[-window:]) / self.equity_curve[-window:-1]
return np.std(returns) * np.sqrt(252)动态风险调整
class DynamicRiskAdjustment:
"""
根据市场状态动态调整风险参数
方法:
1. VIX 类指标
2. 实现波动率
3. 尾部风险
"""
def __init__(self):
self.base_risk = 1.0
def get_risk_multiplier(self, market_state):
"""
根据市场状态返回风险乘数
返回值范围:[0.5, 1.5]
"""
volatility = market_state.get("volatility", 0.15)
trend = market_state.get("trend", 0)
# 低波动率 + 明确趋势:增加风险
if volatility < 0.12 and abs(trend) > 0.05:
return min(1.5, self.base_risk * 1.5)
# 高波动率:降低风险
if volatility > 0.25:
return max(0.5, self.base_risk * 0.5)
# 正常情况
return self.base_risk
def adjust_position_size(self, base_size, market_state):
"""调整仓位大小"""
risk_mult = self.get_risk_multiplier(market_state)
return base_size * risk_mult总结与最佳实践
RL 交易检查清单
□ 数据质量
□ 覆盖多种市场环境
□ 足够的历史数据
□ 包含必要的特征
□ 环境设计
□ 状态空间合理
□ 动作空间符合交易逻辑
□ 奖励函数与目标一致
□ 考虑交易成本和滑点
□ 训练流程
□ 时间序列切分
□ 验证集监控
□ 多次随机种子
□ 早停机制
□ 风险管理
□ 回撤限制
□ 持仓上限
□ 波动率控制
□ 流动性约束
□ 评估方法
□ 样本外测试
□ 滚动窗口验证
□ 与基准比较
□ 压力测试
常见问题与解决方案
| 问题 | 解决方案 |
|---|---|
| 训练不稳定 | 1. 降低学习率 2. 使用归一化 3. 增加批大小 |
| 过拟合 | 1. 增加数据多样性 2. 使用正则化 3. 早停 |
| 样本外差 | 1. 更保守的策略 2. 集成方法 3. 在线微调 |
| 奖励黑客 | 1. 仔细设计奖励函数 2. 使用奖励归一化 |
| 训练慢 | 1. 使用向量化环境 2. 增加并行度 3. 使用 GPU |
推荐学习路径
-
基础(1-2周)
- 完成 Part 1 的理论和代码
- 运行简单的 DQN/PPO 示例
-
实践(2-4周)
- 实现自己的交易环境
- 训练第一个交易智能体
- 完成完整回测
-
进阶(1-2月)
- 研究 Offline RL
- 尝试 Multi-Agent
- 优化风险管理
-
实战(持续)
- 纸上交易验证
- 小资金试运行
- 持续监控和调整
延伸阅读
-
论文:
- “Deep Reinforcement Learning for Trading” (Financial Times)
- “Offline Reinforcement Learning” (Levine et al.)
- “Multi-Agent Reinforcement Learning” (Tampuu et al.)
-
代码库:
- stable-baselines3
- d3rlpy
- rl-transer
-
社区:
- r/reinforcementlearning
- Quantopian(已关闭,但论坛有历史资料)
- Kaggle 竞赛
恭喜! 你已经完成了强化学习交易模块的学习。记住,RL 在量化交易中是一个强大的工具,但需要谨慎使用。从简单开始,逐步迭代,始终重视风险管理。