04-最优执行理论 (Optimal Execution)
预计学习时间:2 小时
难度:⭐⭐⭐⭐
核心问题:手里有一笔大单,怎么拆分执行才能少亏钱?
从一个直觉出发
假设你的量化模型告诉你:“买入 100 万股某只股票。”
你直接下一个市价单?那价格会立刻被推上去,你的成交均价远高于决策时的价格。这就是价格冲击。
那怎么办?分拆成小单慢慢买?
但分得太慢,市场可能在涨,你越买越贵,这是市场风险。
执行的两难困境:
一次性买完 慢慢买
───────── ─────────
冲击成本大 冲击成本低
市场风险小 市场风险大
→ 成交均价差 → 买得越来越贵
你需要在"冲击"和"风险"之间找到最优平衡点
最优执行理论就是研究如何在这个两难困境中找到最优拆分方案。
一、执行问题概述
1.1 为什么执行很重要
在量化研究的收益链条中,执行是最后一环,也是最容易被忽视的一环。
量化收益的侵蚀链条:
模型预期 Alpha: 100%
──────────────────────────
- 信号延迟: -5% (从信号产生到下单)
- 价格冲击: -20% (大单推动价格)
- 买卖价差: -5% (每笔交易的隐形成本)
- 市场时机: -10% (执行期间的价格变动)
- 手续费/税费: -5% (显性成本)
──────────────────────────
实际捕获 Alpha: 55% ← 接近一半被吃掉了
如果执行质量提高 10 个百分点,相当于模型 Alpha 提高 10 个百分点。 执行优化和 Alpha 研究一样有价值。
1.2 交易成本的分类
| 类型 | 特点 | 例子 |
|---|---|---|
| 显性成本 | 可直接观察和度量 | 佣金、印花税、过户费 |
| 隐性成本 | 难以直接观察,但影响更大 | 价格冲击、滑点、时机成本、机会成本 |
| 机会成本 | 未成交部分的市场机会 | 想买 100 万股但只买到 80 万股 |
隐性成本通常是显性成本的 5-10 倍。 这就是为什么需要最优执行理论。
# 典型执行成本分解
total_notional = 1000000 # 100 万元
execution_cost_breakdown = {
'佣金': total_notional * 0.0003, # 万三
'印花税': total_notional * 0.001, # 千一(卖出)
'市场冲击': total_notional * 0.002, # 20bp(大单)
'滑点': total_notional * 0.0005, # 5bp
'择时成本': total_notional * 0.001, # 10bp
}
explicit = execution_cost_breakdown['佣金'] + execution_cost_breakdown['印花税']
implicit = sum(v for k, v in execution_cost_breakdown.items()
if k not in ['佣金', '印花税'])
print(f"显性成本: {explicit/total_notional*100:.3f}%")
print(f"隐性成本: {implicit/total_notional*100:.3f}%")
print(f"隐性/显性: {implicit/explicit:.1f} 倍")
# 显性成本: 0.130%
# 隐性成本: 0.350%
# 隐性/显性: 2.7 倍1.3 价格冲击的两种类型
理解最优执行的第一步,是区分两种完全不同的价格冲击:
永久冲击(Permanent Impact):你的交易向市场传递了信息。市场认为”有大资金在买,可能有什么利好”,所以价格永久性地抬高了。
临时冲击(Temporary Impact):你的大单吃掉了订单簿上的挂单。当你的单子消化完之后,新的挂单补充上来,价格会回到原来的水平附近。
永久冲击 vs 临时冲击:
价格
↑
│ /───────── ← 永久冲击(信息效应)
│ /
│ /\ / ← 临时冲击(流动性消耗)
│ / \/
│/
└──────────────────→ 时间
买入执行区间
永久冲击:价格永久抬高了(信息被市场消化)
临时冲击:执行期间价格先涨后回落(流动性恢复)
这个区分至关重要,因为两种冲击的最优应对策略完全不同。
二、Almgren-Chriss 模型
2.1 模型设定
Almgren & Chriss (2001) 提出的框架是最优执行领域最经典的理论。
设定:
- 你需要在一个固定时间窗口 内买入/卖出总量为 的资产
- 将时间离散化为 个等间隔的时间步,
- 设 为第 个时间步的交易量,
- 设 为第 步开始时的剩余持仓:
价格模型:
其中:
- :随机波动()
- :临时冲击函数
执行后的实际成交价:
其中 是临时冲击(成交时额外多付/少收的价格)。
2.2 线性冲击假设
最常用的简化假设是冲击与交易速率成线性关系:
其中 是永久冲击系数, 是临时冲击系数, 是交易速率。
关键区别:
- :交易越快,价格永久偏离越多
- :每步交易量越大,成交价越差
2.3 目标函数
Almgren-Chriss 模型最小化的是风险调整后的执行成本:
其中 是风险厌恶参数。
期望成本(主要由临时冲击驱动):
成本方差(由市场随机波动驱动):
直觉:第一项惩罚单步交易量太大(临时冲击),第二项惩罚剩余持仓太多(市场风险)。
2.4 最优执行轨迹
Almgren-Chriss 模型的核心结果:最优执行轨迹是一条抛物线,由风险厌恶参数 控制。
解析解为:
其中 是”执行速度”参数。
当 (风险中性)时,,退化为均匀拆分。 当 (极端风险厌恶)时,趋向于一次性全部执行。
import numpy as np
import matplotlib.pyplot as plt
np.random.seed(42)
# ============================================================
# Almgren-Chriss 最优执行模型
# ============================================================
class AlmgrenChriss:
"""Almgren-Chriss 最优执行模型"""
def __init__(self, total_shares, N, T, sigma, eta, gamma, risk_aversion):
"""
total_shares: 总交易量
N: 时间步数
T: 总时间窗口
sigma: 每期波动率
eta: 永久冲击系数
gamma: 临时冲击系数
risk_aversion: 风险厌恶参数
"""
self.X = total_shares
self.N = N
self.T = T
self.tau = T / N
self.sigma = sigma
self.eta = eta
self.gamma = gamma
self.lam = risk_aversion
def solve(self):
"""求解最优执行轨迹(解析解)"""
X, N, T = self.X, self.N, self.T
sigma, eta, gamma, lam = self.sigma, self.eta, self.gamma, self.lam
tau = self.tau
kappa = np.sqrt(lam * sigma**2 / (2 * gamma))
# 时间网格
times = np.arange(N) * tau # t_1, t_2, ..., t_N
if kappa * T < 1e-8:
# 退化为均匀分配(VWAP)
self.trades = np.ones(N) * X / N
else:
# 解析解
self.trades = (X * kappa
* np.cosh(kappa * (T - times))
/ np.sinh(kappa * T) * tau)
# 剩余持仓
self.holdings = np.zeros(N + 1)
self.holdings[0] = X
for k in range(N):
self.holdings[k + 1] = self.holdings[k] - self.trades[k]
# 确保数值精确(最后一步清零)
self.trades[-1] = self.holdings[-2]
self.holdings[-1] = 0
# 计算成本
self.expected_cost = gamma / tau * np.sum(self.trades**2)
self.cost_variance = sigma**2 * tau * np.sum(self.holdings[:-1]**2)
self.permanent_cost = 0.5 * eta * X**2
return self.trades, self.holdings
def simulate(self, n_sims=1000):
"""蒙特卡洛模拟执行结果"""
X, N, T = self.X, self.N, self.T
sigma, eta, gamma = self.sigma, self.eta, self.gamma
tau = self.tau
trades, holdings = self.trades, self.holdings
all_costs = []
all_prices = []
for _ in range(n_sims):
S0 = 100.0
prices = [S0]
total_cost = 0
for k in range(N):
# 市场随机波动 + 永久冲击
noise = np.random.normal(0, 1)
price_change = sigma * np.sqrt(tau) * noise + eta * trades[k]
new_price = prices[-1] + price_change
# 实际成交价 = 市场价 + 临时冲击
exec_price = new_price + gamma * trades[k] / tau
total_cost += (exec_price - S0) * trades[k]
prices.append(exec_price)
all_costs.append(total_cost)
all_prices.append(prices)
return np.array(all_costs), np.array(all_prices)
# ============================================================
# 不同风险厌恶下的最优策略对比
# ============================================================
total_shares = 100000
N = 20
T = 1.0
sigma = 0.02
eta = 2.5e-7
gamma = 2.5e-6
risk_params = [1e-8, 1e-6, 1e-4, 1e-2]
labels = ['极低(风险中性近似)', '低', '中', '高(极度保守)']
colors = ['green', 'blue', 'orange', 'red']
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
for lam, label, color in zip(risk_params, labels, colors):
model = AlmgrenChriss(total_shares, N, T, sigma, eta, gamma, lam)
trades, holdings = model.solve()
axes[0].plot(trades / total_shares * 100, 'o-', label=label,
linewidth=2, color=color)
axes[1].plot(holdings / total_shares * 100, 's-', label=label,
linewidth=2, color=color)
axes[0].set_xlabel('时间步')
axes[0].set_ylabel('每步交易量占比 (%)')
axes[0].set_title('不同风险厌恶下的交易轨迹')
axes[0].legend(fontsize=8)
axes[0].grid(True, alpha=0.3)
axes[1].set_xlabel('时间步')
axes[1].set_ylabel('剩余持仓占比 (%)')
axes[1].set_title('不同风险厌恶下的持仓路径')
axes[1].legend(fontsize=8)
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()2.5 关键参数的敏感性
| 参数 | 含义 | 增大时的效果 |
|---|---|---|
| 资产波动率 | 波动大 → 市场风险大 → 应该更快执行 | |
| 永久冲击系数 | 永久冲击大 → 信息泄露严重 → 应该更慢执行 | |
| 临时冲击系数 | 临时冲击大 → 单笔成本高 → 应该更均匀拆分 | |
| 风险厌恶 | 越怕风险 → 越倾向于快速完成 |
# 敏感性分析:波动率对最优策略的影响
sigma_range = [0.01, 0.02, 0.03, 0.05]
fig, ax = plt.subplots(figsize=(10, 6))
for sig in sigma_range:
model = AlmgrenChriss(total_shares, N, T, sig, eta, gamma, 1e-6)
trades, holdings = model.solve()
ax.plot(holdings / total_shares * 100, 'o-', linewidth=2,
label=f'sigma={sig}')
ax.set_xlabel('时间步')
ax.set_ylabel('剩余持仓 (%)')
ax.set_title('波动率越高,执行越应该前重(减少市场暴露)')
ax.legend()
ax.grid(True, alpha=0.3)
plt.show()三、交易前沿(Trading Frontier)
3.1 从均方差到帕累托前沿
Almgren-Chriss 模型的一个优美性质:最优执行策略可以用一条帕累托前沿来刻画。
就像 Markowitz 的均值方差前沿一样,交易前沿刻画了:
- 横轴:执行成本的标准差(执行风险)
- 纵轴:预期执行成本(平均成本)
交易前沿(Trading Frontier):
执行成本
↑
│ /────────────── ← 高风险策略(激进执行)
│ / 有效前沿
│ /
│ /
│ /
│ /
│ / ← 低风险策略(保守执行)
│ /
│ /
│ /
└──────────────────────────→ 执行成本波动率
快速执行:成本低但波动大(临时冲击小,但市场风险大)
慢速执行:成本高但波动小(临时冲击大,但市场风险小)
3.2 数学表达
最优执行策略的预期成本和成本方差之间存在二次关系:
其中 是最小成本边界(即使风险中性也必须支付的最低成本,即永久冲击成本)。
3.3 与 Markowitz 有效前沿的类比
| 维度 | Markowitz 有效前沿 | 交易前沿 |
|---|---|---|
| 横轴 | 组合收益波动率 | 执行成本波动率 |
| 纵轴 | 预期收益 | 预期执行成本(取反) |
| 优化目标 | 最大化风险调整后收益 | 最小化风险调整后成本 |
| 决定参数 | 风险厌恶系数 | 风险厌恶系数 |
| 约束 | 预算约束、权重之和=1 | 总交易量固定 |
| 含义 | 选什么资产 | 怎么执行交易 |
类比:Markowitz 回答”买什么”,交易前沿回答”怎么买”。两者是一体两面。
# 计算交易前沿
lam_range = np.logspace(-8, -2, 50)
costs = []
risks = []
for lam_val in lam_range:
model = AlmgrenChriss(total_shares, N, T, sigma, eta, gamma, lam_val)
trades, holdings = model.solve()
costs.append(model.expected_cost)
risks.append(np.sqrt(model.cost_variance))
costs = np.array(costs)
risks = np.array(risks)
fig, ax = plt.subplots(figsize=(10, 6))
ax.plot(risks, costs, 'b-', linewidth=2)
ax.scatter(risks[0], costs[0], c='red', s=100, zorder=5,
label='激进(快速执行)')
ax.scatter(risks[-1], costs[-1], c='green', s=100, zorder=5,
label='保守(慢速执行)')
ax.set_xlabel('执行成本标准差')
ax.set_ylabel('预期执行成本')
ax.set_title('Almgren-Chriss 交易前沿')
ax.legend()
ax.grid(True, alpha=0.3)
plt.show()四、VWAP / TWAP 算法
4.1 TWAP(时间加权平均价格)
原理:将总交易量均匀分配到每个时间间隔。
适用场景:
- 交易量稳定的市场
- 不想暴露太多信息的交易
- 简单、易于实现
优点:实现简单,不依赖历史成交量模式,执行轨迹可预测。 缺点:不考虑成交量的日内模式,在成交量低的时段可能造成更大的冲击。
4.2 VWAP(成交量加权平均价格)
原理:按照历史成交量分布来分配交易量。
其中 是第 个时间步的预期成交量(通常用历史同时间段的成交量估计)。
适用场景:
- 成交量日内模式明显的市场(如 A 股的 U 形曲线)
- 流动性较好的股票
- 标准化的大额执行
def generate_vwap_schedule(X, n_steps, volume_pattern=None):
"""
生成 VWAP 执行计划
参数:
X: 总交易量
n_steps: 时间步数
volume_pattern: 成交量分布(归一化),若为 None 则使用 U 形模式
"""
if volume_pattern is None:
# 典型的日内成交量 U 形模式
t = np.linspace(0, np.pi, n_steps)
volume_pattern = 0.5 + 0.5 * np.sin(t)
volume_pattern = np.array(volume_pattern) / np.sum(volume_pattern)
trades = X * volume_pattern
holdings = np.zeros(n_steps + 1)
holdings[0] = X
for k in range(n_steps):
holdings[k + 1] = holdings[k] - trades[k]
return trades, holdings
def generate_twap_schedule(X, n_steps):
"""生成 TWAP 执行计划"""
trades = np.ones(n_steps) * X / n_steps
holdings = np.zeros(n_steps + 1)
holdings[0] = X
for k in range(n_steps):
holdings[k + 1] = holdings[k] - trades[k]
return trades, holdings
# 对比 VWAP 和 TWAP
n_steps = 20
vwap_trades, vwap_holdings = generate_vwap_schedule(total_shares, n_steps)
twap_trades, twap_holdings = generate_twap_schedule(total_shares, n_steps)
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
axes[0].bar(range(n_steps), vwap_trades / 10000, alpha=0.6,
label='VWAP', color='blue')
axes[0].bar(range(n_steps), twap_trades / 10000, alpha=0.4,
label='TWAP', color='orange')
axes[0].set_xlabel('时间步')
axes[0].set_ylabel('交易量(万股)')
axes[0].set_title('VWAP vs TWAP 交易量分配')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
axes[1].plot(vwap_holdings / total_shares * 100, 'b-o',
label='VWAP', linewidth=2)
axes[1].plot(twap_holdings / total_shares * 100, 'orange',
marker='s', label='TWAP', linewidth=2)
axes[1].set_xlabel('时间步')
axes[1].set_ylabel('剩余持仓 (%)')
axes[1].set_title('VWAP vs TWAP 持仓路径')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()4.3 POV(按成交量比例执行)
原理:不是按固定计划执行,而是根据实时市场成交量来调整。
其中 POV 是一个比例参数(如 10%、20%)。
def pov_algorithm(target_shares, market_volume_series, participation_rate=0.1):
"""POV 参与率算法
target_shares: 目标交易量
market_volume_series: 每期市场成交量序列
participation_rate: 参与率(0-1)
"""
remaining = target_shares
trades = []
for t, mkt_vol in enumerate(market_volume_series):
if remaining <= 0:
trades.append(0)
continue
# 按比例交易
trade_vol = min(remaining, mkt_vol * participation_rate)
# 自适应:如果时间过半但剩余量大,提高参与率
progress = (t + 1) / len(market_volume_series)
if progress > 0.6 and remaining / target_shares > 0.5:
trade_vol = min(remaining, mkt_vol * participation_rate * 1.5)
trades.append(trade_vol)
remaining -= trade_vol
return np.array(trades), remaining4.4 与最优执行的对比
| 维度 | TWAP | VWAP | POV | 最优执行 |
|---|---|---|---|---|
| 核心逻辑 | 均匀分拆 | 按成交量分拆 | 按实时成交量分拆 | 优化风险-成本权衡 |
| 市场适应性 | 无 | 弱(基于历史) | 强 | 强 |
| 最优性 | 通常次优 | 通常次优 | 次优 | 理论最优 |
| 实现复杂度 | 极低 | 低 | 中 | 高 |
| 参数估计 | 无 | 需要成交量预测 | 需要 POV 参数 | 需要冲击系数和波动率 |
| 信息泄露 | 固定模式 | 固定模式 | 不固定 | 不固定 |
关键认知:VWAP/TWAP 是启发式算法,不是最优解。它们没有显式地平衡”冲击”和”风险”。在简单的场景下表现尚可,但在复杂市场条件下可能远逊于最优执行。
五、动态规划方法
5.1 为什么需要动态规划
Almgren-Chriss 模型假设冲击函数和波动率是已知的、不变的。但现实市场是动态的:
- 市场波动率在变(开盘大、中午小)
- 订单簿深度在变
- 竞争对手的行为在变
- 你已经执行的交易会影响后续的冲击
动态规划方法可以在每一步根据当前市场状态重新计算最优决策。
5.2 离散时间动态规划
将执行问题建模为一个动态规划问题:
状态变量:
- :剩余持仓量
决策变量:
- :当前步的交易量
最优值函数:
从最后一步倒推:
- 终端条件:(执行结束)
- 最后一步:必须把剩余的全部执行完
- 倒数第二步:在”现在买”和”留到最后买”之间权衡
- …依次倒推到第一步
def dp_optimal_execution(X, N, sigma, eta, gamma, lam):
"""
动态规划求解最优执行
状态变量:剩余持仓 x
决策变量:本期交易量 n(0 <= n <= x)
价值函数:V_k(x) = 从状态 x 开始到 T 时刻的最小期望成本
"""
# 离散化持仓空间
n_levels = 101
x_grid = np.linspace(0, X, n_levels)
# 价值函数初始化(终端条件)
V_next = np.zeros(n_levels)
all_policies = {}
# 从最后一步倒推
for k in range(N, 0, -1):
V_current = np.zeros(n_levels)
policy_k = np.zeros(n_levels)
for i in range(n_levels):
x = x_grid[i]
if x < X / N * 0.01: # 剩余量很小
V_current[i] = 0
policy_k[i] = 0
continue
# 在可行交易量范围内搜索最优
min_val = np.inf
best_n = x
# 候选交易量
n_candidates = np.linspace(max(0, x / N * 0.1),
min(x, x / N * 3), 30)
for n in n_candidates:
if n > x:
break
# 临时冲击成本
temp_cost = gamma * n**2 / (X / N)
# 风险成本(剩余持仓的方差)
future_x = x - n
risk_cost = lam * sigma**2 * future_x**2
# 未来价值函数(插值)
j_future = np.argmin(np.abs(x_grid - future_x))
future_val = V_next[j_future]
total = temp_cost + risk_cost + future_val
if total < min_val:
min_val = total
best_n = n
V_current[i] = min_val
policy_k[i] = best_n
V_next = V_current
all_policies[k] = policy_k
# 正向执行
trades = []
x_current = X
for k in range(1, N + 1):
j = np.argmin(np.abs(x_grid - x_current))
n = all_policies[k][j]
n = min(n, x_current)
trades.append(n)
x_current -= n
return np.array(trades)
# 对比动态规划与 Almgren-Chriss
dp_trades = dp_optimal_execution(total_shares, N, sigma, eta, gamma, 1e-6)
dp_holdings = np.zeros(N + 1)
dp_holdings[0] = total_shares
for k in range(N):
dp_holdings[k + 1] = dp_holdings[k] - dp_trades[k]
ac_model = AlmgrenChriss(total_shares, N, T, sigma, eta, gamma, 1e-6)
ac_trades, ac_holdings = ac_model.solve()
fig, ax = plt.subplots(figsize=(10, 5))
ax.plot(dp_holdings / total_shares * 100, 'b-o',
label='动态规划', linewidth=2)
ax.plot(ac_holdings / total_shares * 100, 'r-s',
label='Almgren-Chriss 解析解', linewidth=2)
ax.set_xlabel('时间步')
ax.set_ylabel('剩余持仓 (%)')
ax.set_title('动态规划 vs Almgren-Chriss 执行路径对比')
ax.legend()
ax.grid(True, alpha=0.3)
plt.show()5.3 动态规划的扩展方向
| 扩展方向 | 内容 |
|---|---|
| 时变参数 | 波动率、冲击系数随时间变化 |
| 多资产 | 同时执行多个资产,考虑相关性 |
| 订单簿约束 | 利用实时订单簿深度信息 |
| 竞争对手建模 | 考虑其他算法交易者的行为 |
| 随机控制 | 框架更一般的连续时间版本 |
六、交易成本分析 (TCA)
6.1 TCA 的两个面向
TCA(Transaction Cost Analysis)回答两个问题:
TCA 的两个面向:
事前(Ex-ante) 事后(Ex-post)
────────────── ──────────────
"预计要花多少钱?" "实际花了多少钱?"
→ 用于决策: → 用于改进:
是否交易? 算法是否有效?
用什么算法? 哪些成本可以降低?
执行多少量? 下次怎么做得更好?
6.2 Implementation Shortfall
Implementation Shortfall 是最常用的 TCA 指标:
其中 是投资决策时的价格, 是平均成交价。
对于买入操作,IS 为正表示你买贵了(相对于决策价格)。
IS 可以进一步分解为:
6.3 事后归因分析
import numpy as np
import pandas as pd
def transaction_cost_attribution(prices, trades, decision_price=None):
"""
事后交易成本归因
参数:
prices: 每步执行时的市场价格
trades: 每步交易量(正=买入,负=卖出)
decision_price: 基准价格(决策时价格)
返回:
attribution: 各成本分项
"""
n_steps = len(trades)
if decision_price is None:
decision_price = prices[0]
total_volume = np.sum(np.abs(trades))
avg_exec_price = np.sum(prices * trades) / np.sum(trades)
# Implementation Shortfall
is_total = decision_price - avg_exec_price
is_bps = is_total / decision_price * 10000
# 成本分解
half_spread = 0.001 # 假设半价差
spread_cost = half_spread * total_volume * decision_price
impact_cost = (avg_exec_price - prices[0]) * np.sum(trades) - spread_cost
timing_cost = (decision_price - prices[0]) * np.sum(trades)
return {
'implementation_shortfall_bps': is_bps,
'avg_exec_price': avg_exec_price,
'decision_price': decision_price,
'spread_cost_bps': spread_cost / (decision_price * total_volume) * 10000,
'impact_cost_bps': impact_cost / (decision_price * total_volume) * 10000,
'timing_cost_bps': timing_cost / (decision_price * total_volume) * 10000,
'total_volume': total_volume
}
# 模拟执行过程
np.random.seed(42)
n_exec_steps = 20
true_prices = [100.0]
for _ in range(n_exec_steps):
ret = np.random.normal(0.0003, 0.01)
true_prices.append(true_prices[-1] * (1 + ret))
true_prices = np.array(true_prices[:-1])
exec_schedule = np.ones(n_exec_steps) * total_shares / n_exec_steps
# 模拟冲击效应
exec_prices = true_prices + np.cumsum(np.random.normal(0, 0.02, n_exec_steps))
result = transaction_cost_attribution(exec_prices, exec_schedule, decision_price=100.0)
print(f"=== 交易成本归因 ===")
for key, val in result.items():
if 'bps' in key:
print(f" {key}: {val:.2f} bp")
else:
print(f" {key}: {val}")6.4 成本可控性分析
| 成本分量 | 驱动因素 | 可控性 |
|---|---|---|
| 显性成本 | 佣金、税费 | 可谈判 |
| 价差成本 | 买卖价差 | 部分可控(选择执行时机) |
| 临时冲击 | 单笔交易量 | 可控(分拆策略) |
| 永久冲击 | 总交易量和信息含量 | 部分可控(执行速度) |
| 择时成本 | 执行期间的市场变动 | 部分可控(加速/减速) |
| 机会成本 | 未成交部分 | 部分可控(限价单策略) |
七、冲击模型的进阶:线性 vs 平方根
7.1 两种冲击函数假设
Almgren-Chriss 默认使用线性冲击:。
但大量实证研究发现,冲击更接近平方根模型:
其中 是日均成交量。
# 对比线性 vs 平方根冲击模型
v_daily = 1000000
sigma = 0.02
eta = 0.1
trade_fractions = np.linspace(0.01, 0.5, 100)
trade_sizes = trade_fractions * v_daily
linear_impact = eta * trade_sizes / v_daily * sigma
sqrt_impact = eta * sigma * np.sqrt(trade_sizes / v_daily)
fig, ax = plt.subplots(figsize=(10, 6))
ax.plot(trade_fractions, linear_impact * 10000, '--', label='线性模型')
ax.plot(trade_fractions, sqrt_impact * 10000, '-', label='平方根模型')
ax.set_xlabel('交易量 / 日均成交量')
ax.set_ylabel('冲击成本 (bp)')
ax.set_title('线性 vs 平方根冲击模型')
ax.legend()
ax.grid(True, alpha=0.3)
plt.show()
# 大单时差异更明显
for frac in [0.1, 0.3, 0.5]:
idx = int(frac / 0.5 * 99)
print(f"{int(frac*100)}% ADV: 线性={linear_impact[idx]*10000:.1f}bp, "
f"根号={sqrt_impact[idx]*10000:.1f}bp")10% ADV: 线性=2.0bp, 根号=6.3bp
30% ADV: 线性=6.0bp, 根号=11.0bp
50% ADV: 线性=10.0bp, 根号=14.1bp
直觉:平方根模型更符合实际——大单的边际冲击递减,因为市场有更多的参与者来吸收你的订单。
八、完整执行模拟
下面用一个完整的例子,把 Almgren-Chriss 模型、执行算法、成本归因整合起来。
import numpy as np
import matplotlib.pyplot as plt
np.random.seed(42)
# ============================================================
# 场景:需要在 1 天内买入 100 万股
# ============================================================
X = 1000000
T = 1.0
N = 20
sigma = 0.02
eta = 2.5e-7
gamma = 2.5e-6
lam = 1e-6
# ============================================================
# 求解最优策略
# ============================================================
model = AlmgrenChriss(X, N, T, sigma, eta, gamma, lam)
trades, holdings = model.solve()
# 蒙特卡洛模拟
costs_all, prices_all = model.simulate(n_sims=1000)
# TWAP 对比
twap_trades, twap_holdings = generate_twap_schedule(X, N)
twap_costs = []
for _ in range(1000):
S0 = 100.0
price = S0
total_cost = 0
for k in range(N):
noise = np.random.normal(0, 1)
price += sigma * np.sqrt(T/N) * noise + eta * twap_trades[k]
exec_price = price + gamma * twap_trades[k] / (T/N)
total_cost += (exec_price - S0) * twap_trades[k]
twap_costs.append(total_cost)
twap_costs = np.array(twap_costs)
# ============================================================
# 可视化
# ============================================================
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 价格路径
axes[0, 0].plot(np.median(prices_all, axis=0), 'b-', linewidth=2,
label='中位数价格')
axes[0, 0].fill_between(
range(N + 1),
np.percentile(prices_all, 25, axis=0),
np.percentile(prices_all, 75, axis=0),
alpha=0.2, color='blue', label='25-75 分位')
axes[0, 0].set_title('最优执行下的价格路径')
axes[0, 0].set_xlabel('时间步')
axes[0, 0].set_ylabel('价格')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 交易量分配
axes[0, 1].bar(range(N), trades / 10000, alpha=0.6,
label='最优执行', color='blue')
axes[0, 1].bar(range(N), twap_trades / 10000, alpha=0.4,
label='TWAP', color='orange')
axes[0, 1].set_title('交易量分配对比')
axes[0, 1].set_xlabel('时间步')
axes[0, 1].set_ylabel('交易量(万股)')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# 成本分布对比
axes[1, 0].hist(costs_all, bins=50, alpha=0.5, label='最优执行',
density=True, color='blue')
axes[1, 0].hist(twap_costs, bins=50, alpha=0.5, label='TWAP',
density=True, color='orange')
axes[1, 0].axvline(np.mean(costs_all), color='blue', linestyle='--',
label=f'最优均值={np.mean(costs_all):.0f}')
axes[1, 0].axvline(np.mean(twap_costs), color='orange', linestyle='--',
label=f'TWAP均值={np.mean(twap_costs):.0f}')
axes[1, 0].set_title('执行成本分布对比')
axes[1, 0].set_xlabel('执行成本')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
# 成本分项
categories = ['永久冲击', '临时冲击', '市场风险(波动率惩罚)']
values = [model.permanent_cost, model.expected_cost, model.cost_variance]
axes[1, 1].bar(categories, values, color=['red', 'blue', 'green'], alpha=0.7)
axes[1, 1].set_title('最优执行的成本分解')
axes[1, 1].set_ylabel('成本')
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 统计汇总
print(f"=== 最优执行 vs TWAP ===")
print(f"最优执行 - 均值成本: {np.mean(costs_all):.0f}, "
f"标准差: {np.std(costs_all):.0f}")
print(f"TWAP - 均值成本: {np.mean(twap_costs):.0f}, "
f"标准差: {np.std(twap_costs):.0f}")
saving = (np.mean(twap_costs) - np.mean(costs_all)) / np.mean(twap_costs) * 100
print(f"成本节省: {saving:.1f}%")九、本文件核心要点
| 概念 | 要点 |
|---|---|
| 永久冲击 vs 临时冲击 | 永久冲击是信息效应,临时冲击是流动性消耗。两者应对策略不同 |
| Almgren-Chriss 模型 | 在冲击成本和市场风险之间权衡,得出最优拆分轨迹 |
| 交易前沿 | 执行策略的帕累托前沿,类似 Markowitz 有效前沿 |
| VWAP/TWAP | 简单但非最优的启发式算法。VWAP 适合成交量有规律的市场 |
| 动态规划 | 可以处理时变参数和更复杂的市场状态,但计算量更大 |
| TCA | 事前估计执行成本,事后归因分析成本来源,持续改进执行质量 |
| 冲击模型选择 | 线性模型简单但不够真实,平方根模型更符合实证 |
一句话总结:最优执行的核心是在”冲击”和”风险”之间找到平衡——交易太快会冲击价格,交易太慢会承担市场风险。Almgren-Chriss 模型给出了这个权衡的理论最优解,VWAP/TWAP 是简单但非最优的近似,动态规划提供了更灵活的框架。理解执行成本,对评估任何量化策略的真实收益都至关重要。
参考阅读:Almgren & Chriss (2001), “Optimal Execution of Portfolio Transactions”; Kissell (2013), “The Science of Algorithmic Trading and Portfolio Management”
→ 返回目录:市场微观结构