市场微观结构
核心问题: 你的策略信号很好,但实际交易时会不会被买卖价差和价格冲击吃掉利润?
学习目标
- 理解订单簿如何决定成交价格
- 掌握买卖价差的来源和度量方法
- 学习价格冲击模型和流动性指标
- 实现 VWAP/TWAP 执行算法
- 理解高频数据的特性与处理
- 核心: 交易成本是策略盈利的隐形杀手
一、订单簿 (Order Book) 结构
1.1 什么是订单簿?
订单簿是实时记录所有未成交买卖订单的数据库,反映了市场的供需状态。
订单簿结构示意
─────────────────
卖方 (Ask) 价格 买方 (Bid)
───────────────────────────────────────
100股 @ 10.05 ┃ 150股 @ 10.00
50股 @ 10.04 ┃ 200股 @ 9.99
200股 @ 10.03 ┃ 100股 @ 9.98
150股 @ 10.02 ←─ 最优卖价 (Ask) ┃ 300股 @ 9.97 ←─ 最优买价 (Bid)
───────────────────────────────────────
买卖价差 = 10.02 - 9.97 = 0.05
中间价 = (10.02 + 9.97) / 2 = 9.995
1.2 价格优先与时间优先
订单成交遵循两个原则:
| 优先级 | 原则 | 说明 |
|---|---|---|
| 1 | 价格优先 | 买入价高的优先,卖出价低的优先 |
| 2 | 时间优先 | 同价格情况下,下单时间早的优先 |
1.3 订单簿的 Python 模拟
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from collections import defaultdict
import heapq
class OrderBook:
"""订单簿模拟器"""
def __init__(self, tick_size=0.01):
"""
初始化订单簿
参数:
tick_size: 最小价格变动单位
"""
self.tick_size = tick_size
self.bids = {} # 买方订单: {price: [(order_id, quantity, timestamp), ...]}
self.asks = {} # 卖方订单: {price: [(order_id, quantity, timestamp), ...]}
self.order_id = 0
self.trade_history = [] # 成交记录
def add_limit_order(self, side, price, quantity, timestamp=None):
"""
添加限价订单
参数:
side: 'bid'(买) 或 'ask'(卖)
price: 价格
quantity: 数量
timestamp: 时间戳
返回:
成交记录列表
"""
if timestamp is None:
timestamp = len(self.trade_history)
self.order_id += 1
order = (self.order_id, quantity, timestamp)
trades = []
if side == 'bid':
# 买入订单: 先检查能否与卖方订单成交
while quantity > 0 and self.asks:
best_ask = min(self.asks.keys())
if price < best_ask:
break # 买价低于最优卖价,无法成交
# 与最优卖价成交
ask_queue = self.asks[best_ask]
while quantity > 0 and ask_queue:
ask_order = ask_queue[0]
ask_id, ask_qty, ask_time = ask_order
trade_qty = min(quantity, ask_qty)
trade_price = best_ask
trades.append({
'price': trade_price,
'quantity': trade_qty,
'timestamp': timestamp,
'bid_order_id': self.order_id,
'ask_order_id': ask_id
})
quantity -= trade_qty
ask_qty -= trade_qty
if ask_qty == 0:
# 卖方订单完全成交,移除
heapq.heappop(ask_queue)
if not ask_queue:
del self.asks[best_ask]
else:
# 更新卖方订单剩余数量
ask_queue[0] = (ask_id, ask_qty, ask_time)
# 剩余数量加入买方订单簿
if quantity > 0:
if price not in self.bids:
self.bids[price] = []
heapq.heappush(self.bids[price], order)
else: # side == 'ask'
# 卖出订单: 先检查能否与买方订单成交
while quantity > 0 and self.bids:
best_bid = max(self.bids.keys())
if price > best_bid:
break # 卖价高于最优买价,无法成交
# 与最优买价成交
bid_queue = self.bids[best_bid]
while quantity > 0 and bid_queue:
bid_order = bid_queue[0]
bid_id, bid_qty, bid_time = bid_order
trade_qty = min(quantity, bid_qty)
trade_price = best_bid
trades.append({
'price': trade_price,
'quantity': trade_qty,
'timestamp': timestamp,
'bid_order_id': bid_id,
'ask_order_id': self.order_id
})
quantity -= trade_qty
bid_qty -= trade_qty
if bid_qty == 0:
# 买方订单完全成交,移除
heapq.heappop(bid_queue)
if not bid_queue:
del self.bids[best_bid]
else:
# 更新买方订单剩余数量
bid_queue[0] = (bid_id, bid_qty, bid_time)
# 剩余数量加入卖方订单簿
if quantity > 0:
if price not in self.asks:
self.asks[price] = []
heapq.heappush(self.asks[price], order)
self.trade_history.extend(trades)
return trades
def get_best_bid(self):
"""获取最优买价和数量"""
if not self.bids:
return None, 0
best_price = max(self.bids.keys())
total_qty = sum(order[1] for order in self.bids[best_price])
return best_price, total_qty
def get_best_ask(self):
"""获取最优卖价和数量"""
if not self.asks:
return None, 0
best_price = min(self.asks.keys())
total_qty = sum(order[1] for order in self.asks[best_price])
return best_price, total_qty
def get_spread(self):
"""获取买卖价差"""
best_bid, _ = self.get_best_bid()
best_ask, _ = self.get_best_ask()
if best_bid is None or best_ask is None:
return None
return best_ask - best_bid
def get_mid_price(self):
"""获取中间价"""
best_bid, _ = self.get_best_bid()
best_ask, _ = self.get_best_ask()
if best_bid is None or best_ask is None:
return None
return (best_bid + best_ask) / 2
def get_depth(self, n_levels=5):
"""
获取订单簿深度
返回:
(bid_levels, ask_levels) 每个是 [(price, total_quantity), ...]
"""
# 买方: 价格从高到低
bid_prices = sorted(self.bids.keys(), reverse=True)[:n_levels]
bid_levels = [
(price, sum(order[1] for order in self.bids[price]))
for price in bid_prices
]
# 卖方: 价格从低到高
ask_prices = sorted(self.asks.keys())[:n_levels]
ask_levels = [
(price, sum(order[1] for order in self.asks[price]))
for price in ask_prices
]
return bid_levels, ask_levels
def visualize(self, n_levels=10):
"""可视化订单簿"""
bid_levels, ask_levels = self.get_depth(n_levels)
print("\n" + "=" * 50)
print("订单簿状态")
print("=" * 50)
print(f"{'卖方 (Ask)':<30} {'价格':<10} {'买方 (Bid)':<30}")
print("-" * 50)
max_len = max(len(bid_levels), len(ask_levels))
for i in range(max_len):
ask_str = ""
price_str = ""
bid_str = ""
if i < len(ask_levels):
ask_price, ask_qty = ask_levels[i]
ask_str = f"{ask_qty:>6}股"
if i < len(bid_levels):
bid_price, bid_qty = bid_levels[i]
bid_str = f"{bid_qty:>6}股"
if i < len(ask_levels) and i < len(bid_levels):
price_str = f"{ask_levels[i][0]:.2f}"
elif i < len(ask_levels):
price_str = f"{ask_levels[i][0]:.2f}"
elif i < len(bid_levels):
price_str = f"{bid_levels[i][0]:.2f}"
print(f"{ask_str:<30} {price_str:<10} {bid_str:<30}")
best_bid, bid_qty = self.get_best_bid()
best_ask, ask_qty = self.get_best_ask()
spread = self.get_spread()
mid = self.get_mid_price()
print("-" * 50)
print(f"最优买价: {best_bid} ({bid_qty}股)")
print(f"最优卖价: {best_ask} ({ask_qty}股)")
print(f"买卖价差: {spread} ({spread/mid*100:.2f}%)" if spread else "买卖价差: N/A")
print(f"中间价: {mid}")
print("=" * 50)
# 模拟订单簿
print("=" * 60)
print("订单簿模拟示例")
print("=" * 60)
# 创建订单簿
book = OrderBook(tick_size=0.01)
# 添加一些初始订单
print("\n【添加初始订单】")
book.add_limit_order('bid', 10.00, 150, timestamp=0)
book.add_limit_order('bid', 9.99, 200, timestamp=1)
book.add_limit_order('bid', 9.98, 100, timestamp=2)
book.add_limit_order('ask', 10.02, 150, timestamp=3)
book.add_limit_order('ask', 10.03, 200, timestamp=4)
book.add_limit_order('ask', 10.04, 100, timestamp=5)
book.visualize()
# 添加一个市价买入单(会立即成交)
print("\n【市价买入 300 股】")
trades = book.add_limit_order('bid', 10.10, 300, timestamp=6) # 高价确保成交
print(f"成交了 {len(trades)} 笔:")
for trade in trades:
print(f" 价格 {trade['price']}, 数量 {trade['quantity']}")
book.visualize()
# 添加一个限价卖单
print("\n【限价卖出 100 股 @ 10.01】")
trades = book.add_limit_order('ask', 10.01, 100, timestamp=7)
if trades:
print(f"成交了 {len(trades)} 笔")
else:
print("未能成交,加入订单簿")
book.visualize()1.4 订单簿深度分析
def analyze_order_book_depth(order_book, n_levels=10):
"""
分析订单簿深度特征
参数:
order_book: OrderBook 实例
n_levels: 分析的档位深度
返回:
深度特征字典
"""
bid_levels, ask_levels = order_book.get_depth(n_levels)
# 计算累计深度
bid_cumulative = np.cumsum([qty for _, qty in bid_levels])
ask_cumulative = np.cumsum([qty for _, qty in ask_levels])
# 计算订单簿斜率(价格随数量的变化)
if len(bid_levels) > 1:
bid_prices = [price for price, _ in bid_levels]
bid_slope = (bid_prices[0] - bid_prices[-1]) / bid_cumulative[-1]
else:
bid_slope = 0
if len(ask_levels) > 1:
ask_prices = [price for price, _ in ask_levels]
ask_slope = (ask_prices[-1] - ask_prices[0]) / ask_cumulative[-1]
else:
ask_slope = 0
# 订单簿不平衡
total_bid_qty = sum(qty for _, qty in bid_levels)
total_ask_qty = sum(qty for _, qty in ask_levels)
imbalance = (total_bid_qty - total_ask_qty) / (total_bid_qty + total_ask_qty)
return {
'bid_depth': total_bid_qty,
'ask_depth': total_ask_qty,
'imbalance': imbalance, # 正值表示买方更强
'bid_slope': bid_slope,
'ask_slope': ask_slope,
'spread_pct': (order_book.get_spread() / order_book.get_mid_price()) * 100
}
# 分析深度
depth_features = analyze_order_book_depth(book)
print("\n【订单簿深度分析】")
print("-" * 50)
for feature, value in depth_features.items():
if 'imbalance' in feature or 'pct' in feature:
print(f"{feature}: {value:.4f}")
else:
print(f"{feature}: {value:.2f}")二、买卖价差 (Bid-Ask Spread)
2.1 价差的来源
买卖价差不是”免费的午餐”,而是对做市商的补偿:
| 来源 | 说明 | 影响因素 |
|---|---|---|
| 订单处理成本 | 交易所费用、清算费用、系统成本 | 交易量越大,单位成本越低 |
| 库存风险 | 做市商持有头寸面临价格波动 | 价格波动越大,价差越宽 |
| 信息不对称 | 与知情交易者交易的损失 | 信息不对称越严重,价差越宽 |
2.2 价差的度量
def calculate_spread_metrics(prices, volumes=None):
"""
计算价差相关指标
参数:
prices: 包含 bid 和 ask 价格的 DataFrame
volumes: (可选) 包含 bid 和 ask 数量的 DataFrame
返回:
价差指标字典
"""
# 绝对价差
absolute_spread = prices['ask'] - prices['bid']
# 相对价差 (百分比)
mid_price = (prices['bid'] + prices['ask']) / 2
relative_spread = absolute_spread / mid_price
# 对数价差
log_spread = np.log(prices['ask'] / prices['bid'])
metrics = {
'absolute_spread_mean': absolute_spread.mean(),
'absolute_spread_std': absolute_spread.std(),
'relative_spread_mean': relative_spread.mean(),
'relative_spread_std': relative_spread.std(),
'log_spread_mean': log_spread.mean(),
}
# 如果有数量数据,计算加权价差
if volumes is not None:
# 加权中间价(考虑订单深度)
total_volume = volumes['bid'] + volumes['ask']
weighted_mid = (
prices['bid'] * volumes['ask'] +
prices['ask'] * volumes['bid']
) / total_volume
metrics['weighted_mid_price'] = weighted_mid.mean()
return metrics2.3 Roll 价差估计器
当我们只有成交价格数据时,可以用 Roll (1984) 模型估计价差:
其中 是半价差, 是真实价格波动率。
推导出:
def roll_spread_estimator(price_changes):
"""
Roll 价差估计器
当只有成交价格时,估计有效价差
参数:
price_changes: 价格变化序列
返回:
估计的价差 (绝对值)
"""
# 计算一阶自协方差
cov_lag1 = price_changes.autocorr(lag=1)
if cov_lag1 >= 0:
# 自协方差为正,无法估计(可能价格序列有问题)
return None
# Roll 公式
spread = np.sqrt(-cov_lag1)
return spread
# 模拟价格数据测试 Roll 估计器
np.random.seed(42)
n_obs = 1000
# 真实价差
true_spread = 0.02
# 生成真实价格(随机游走)
true_price = pd.Series(
np.cumsum(np.random.normal(0, 0.01, n_obs)),
index=range(n_obs)
)
# 生成观测价格(在买价和卖价之间跳跃)
observed_prices = []
current_mid = 100 # 初始中间价
for i in range(n_obs):
# 价格在 bid 和 ask 之间跳跃
if i % 2 == 0:
# 买价
observed_prices.append(current_mid - true_spread / 2)
else:
# 卖价
observed_prices.append(current_mid + true_spread / 2)
current_mid += true_price.iloc[i]
observed_prices = pd.Series(observed_prices)
price_changes = observed_prices.diff().dropna()
# 估计价差
estimated_spread = roll_spread_estimator(price_changes)
print("\n【Roll 价差估计】")
print("-" * 40)
print(f"真实价差: {true_spread:.4f}")
print(f"估计价差: {estimated_spread:.4f}" if estimated_spread else "无法估计(自协方差为正)")
print("-" * 40)2.4 CORWIN-SCHULTZ 价差估计
另一种常用的价差估计方法,适用于高频数据:
def corwin_schultz_spread(high, low, window=1):
"""
CORWIN-SCHULTZ 高低价差估计器
参数:
high: 最高价序列
low: 最低价序列
window: 窗口大小
返回:
估计的相对价差
"""
# 计算高低价平方和
def calculate_beta(high_t, high_t1, low_t, low_t1):
numerator = (
np.log(high_t / low_t) ** 2 +
np.log(high_t1 / low_t1) ** 2
)
denominator = 2 / (window - 1)
return numerator / denominator
# 计算_gamma
def calculate_gamma(high_t, high_t1, low_t, low_t1):
return (
np.log(max(high_t, high_t1) / min(low_t, low_t1)) ** 2 +
np.log(max(high_t, high_t1) / min(low_t, low_t1)) ** 2
) / 2
betas = []
gammas = []
for i in range(1, len(high) - window):
beta = calculate_beta(
high.iloc[i], high.iloc[i + window],
low.iloc[i], low.iloc[i + window]
)
gamma = calculate_gamma(
high.iloc[i], high.iloc[i + window],
low.iloc[i], low.iloc[i + window]
)
betas.append(beta)
gammas.append(gamma)
betas = pd.Series(betas)
gammas = pd.Series(gammas)
# 计算_alpha
alpha = (np.sqrt(2 * betas) - np.sqrt(gammas)) / (2 - np.sqrt(2))
# 估计价差
spread = 2 * (np.exp(alpha) - 1) / (1 + np.exp(alpha))
return spread.mean()三、价格冲击模型
3.1 价格冲击的来源
交易对价格的影响路径:
大单交易 → 消耗订单簿流动性 → 价格不利移动 → 价格冲击
两种冲击类型:
├── 临时冲击: 流动性成本,随时间恢复
└── 永久冲击: 信息释放,价格永久重估
3.2 Kyle 模型 (1985)
其中 是价格冲击系数。
def estimate_kyle_lambda(prices, volumes, direction):
"""
估计 Kyle 价格冲击系数
参数:
prices: 价格序列
volumes: 成交量序列
direction: 交易方向 (+1 买入, -1 卖出)
返回:
lambda 系数 (每单位交易量的价格冲击)
"""
# 计算价格变化
price_changes = prices.diff().dropna()
# 计算交易强度(方向 * 成交量)
trade_intensity = direction * volumes
# 对齐数据
aligned_data = pd.DataFrame({
'price_change': price_changes,
'trade_intensity': trade_intensity
}).dropna()
# 回归: 价格变化 = λ * 交易强度 + 残差
from sklearn.linear_model import LinearRegression
X = aligned_data[['trade_intensity']].values
y = aligned_data['price_change'].values
model = LinearRegression(fit_intercept=True)
model.fit(X, y)
kyle_lambda = model.coef_[0]
# 计算 R²
r_squared = model.score(X, y)
return {
'lambda': kyle_lambda,
'r_squared': r_squared,
'intercept': model.intercept_
}
# 模拟价格冲击数据
np.random.seed(42)
n_obs = 500
# 生成交易数据
volumes = np.random.lognormal(10, 1, n_obs) # 成交量
direction = np.random.choice([1, -1], n_obs) # 交易方向
# 真实的价格冲击系数
true_lambda = 0.0001
# 价格 = 基础价格 + 冲击 + 噪声
base_price_change = np.random.normal(0, 0.01, n_obs)
price_impact = true_lambda * direction * volumes
price_changes = base_price_change + price_impact
prices = 100 + np.cumsum(price_changes)
# 估计 Kyle Lambda
result = estimate_kyle_lambda(
pd.Series(prices),
pd.Series(volumes),
pd.Series(direction)
)
print("\n【Kyle 价格冲击模型】")
print("-" * 50)
print(f"真实 λ: {true_lambda:.6f}")
print(f"估计 λ: {result['lambda']:.6f}")
print(f"R²: {result['r_squared']:.4f}")
print("-" * 50)
# 可视化价格冲击
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.scatter(
direction * volumes,
price_changes,
alpha=0.5,
s=20,
label='实际数据'
)
# 添加拟合线
x_line = np.linspace(min(direction * volumes), max(direction * volumes), 100)
y_line = result['intercept'] + result['lambda'] * x_line
plt.plot(x_line, y_line, 'r-', linewidth=2, label=f'拟合线 (λ={result["lambda"]:.6f})')
plt.xlabel('交易强度(方向 × 成交量)')
plt.ylabel('价格变化')
plt.title('Kyle 模型: 交易量 vs 价格冲击')
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(1, 2, 2)
# 累计价格冲击
cumulative_impact = np.cumsum(price_impact)
cumulative_base = np.cumsum(base_price_change)
plt.plot(cumulative_base, label='基础价格变化', alpha=0.7)
plt.plot(cumulative_base + cumulative_impact, label='实际价格(含冲击)', linewidth=2)
plt.xlabel('时间')
plt.ylabel('累计价格变化')
plt.title('价格冲击的累积效应')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()3.3 Amihud 非流动性指标
其中 是天数。
def calculate_amihud_illiquidity(returns, volumes, scaling_factor=1e9):
"""
计算 Amihud 非流动性指标
参数:
returns: 日收益率序列
volumes: 日成交量序列
scaling_factor: 缩放因子(默认为 10^9)
返回:
Amihud 非流动性指标
"""
# |收益率| / 成交量
illiquidity_daily = returns.abs() / volumes
# 平均并缩放
amihud = illiquidity_daily.mean() * scaling_factor
return amihud
# 计算示例
returns_sample = pd.Series(np.random.normal(0.001, 0.02, 100))
volumes_sample = pd.Series(np.random.lognormal(10, 1, 100))
amihud_value = calculate_amihud_illiquidity(returns_sample, volumes_sample)
print("\n【Amihud 非流动性指标】")
print("-" * 40)
print(f"Amihud 值: {amihud_value:.6f}")
print("解释: Amihud 值越大,流动性越差(单位成交量的价格冲击大)")
print("-" * 40)四、知情交易者与流动性提供者
4.1 PIN (Probability of Informed Trading)
PIN 衡量市场中知情交易者的比例:
其中:
- : 信息事件发生的概率
- : 知情交易者的交易速率
- : 不知情交易者的交易速率
4.2 订单流不平衡 (Order Flow Imbalance)
def calculate_ofi(bid_prices, ask_prices, bid_volumes, ask_volumes):
"""
计算订单流不平衡 (Order Flow Imbalance)
OFI 反映了买卖压力的相对强度
参数:
bid_prices: 买价序列(多档位)
ask_prices: 卖价序列(多档位)
bid_volumes: 买量序列(多档位)
ask_volumes: 卖量序列(多档位)
返回:
OFI 序列
"""
ofi_values = []
for t in range(1, len(bid_prices)):
# 计算买方变化
bid_change = 0
# 价格提高(有利)
bid_change += (bid_prices[t] > bid_prices[t-1]).sum() * bid_volumes[t][bid_prices[t] > bid_prices[t-1]].sum()
# 价格降低(不利)
bid_change -= (bid_prices[t] < bid_prices[t-1]).sum() * bid_volumes[t-1][bid_prices[t] < bid_prices[t-1]].sum()
# 价格不变但量增加
same_bid = bid_prices[t] == bid_prices[t-1]
bid_change += (bid_volumes[t][same_bid] - bid_volumes[t-1][same_bid]).clip(lower=0).sum()
bid_change -= (bid_volumes[t][same_bid] - bid_volumes[t-1][same_bid]).clip(upper=0).abs().sum()
# 计算卖方变化(符号相反)
ask_change = 0
ask_change -= (ask_prices[t] < ask_prices[t-1]).sum() * ask_volumes[t][ask_prices[t] < ask_prices[t-1]].sum()
ask_change += (ask_prices[t] > ask_prices[t-1]).sum() * ask_volumes[t-1][ask_prices[t] > ask_prices[t-1]].sum()
same_ask = ask_prices[t] == ask_prices[t-1]
ask_change += (ask_volumes[t][same_ask] - ask_volumes[t-1][same_ask]).clip(lower=0).sum()
ask_change -= (ask_volumes[t][same_ask] - ask_volumes[t-1][same_ask]).clip(upper=0).abs().sum()
# OFI = 买方变化 + 卖方变化
ofi_values.append(bid_change + ask_change)
return pd.Series(ofi_values)
# 简化版 OFI(只考虑最优买卖价)
def simple_ofi(mid_prices, returns):
"""
简化的 OFI 计算方法
使用价格变化方向作为订单流代理
"""
# 价格上涨 = 买方压力大 = 正 OFI
ofi = np.sign(returns) * returns.abs()
return ofi
# 模拟 OFI 预测力测试
np.random.seed(42)
n_days = 500
# 生成 OFI 信号(正态分布)
ofi_signal = np.random.normal(0, 1, n_days)
# OFI 对未来收益有预测力
future_returns = (
0.0005 + # 基础收益
0.001 * ofi_signal + # OFI 的线性影响
np.random.normal(0, 0.02, n_days) # 噪声
)
# 测试预测力
from scipy.stats import pearsonr
correlation, p_value = pearsonr(ofi_signal, future_returns)
print("\n【订单流不平衡 (OFI) 预测力测试】")
print("-" * 60)
print(f"OFI 与未来收益的相关系数: {correlation:.4f}")
print(f"P 值: {p_value:.4f}")
print(f"{'显著' if p_value < 0.05 else '不显著'}")
print("-" * 60)
# 可视化
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
# 将 OFI 分为 5 组,计算每组平均收益
ofi_quintiles = pd.qcut(pd.Series(ofi_signal), 5, labels=['Q1', 'Q2', 'Q3', 'Q4', 'Q5'])
returns_by_quintile = pd.DataFrame({
'ofi_group': ofi_quintiles,
'return': future_returns
}).groupby('ofi_group')['return'].mean()
colors = ['green' if x > 0 else 'red' for x in returns_by_quintile.values]
returns_by_quintile.plot(kind='bar', color=colors, edgecolor='black')
plt.axhline(0, color='black', linewidth=0.8)
plt.xlabel('OFI 分组(Q1=最负,Q5=最正)')
plt.ylabel('平均收益')
plt.title('OFI 与未来收益的关系')
plt.grid(True, alpha=0.3, axis='y')
plt.subplot(1, 2, 2)
plt.scatter(ofi_signal, future_returns, alpha=0.5, s=20)
z = np.polyfit(ofi_signal, future_returns, 1)
p = np.poly1d(z)
plt.plot(ofi_signal, p(ofi_signal), "r-", linewidth=2, label=f'拟合线 (斜率={z[0]:.4f})')
plt.xlabel('订单流不平衡 (OFI)')
plt.ylabel('未来收益')
plt.title('OFI 预测收益散点图')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()五、VWAP / TWAP 执行算法
5.1 VWAP (Volume Weighted Average Price)
目标: 以接近市场成交量加权平均价的价格执行订单。
def calculate_vwap(prices, volumes):
"""
计算 VWAP
VWAP = Σ(价格 × 成交量) / Σ(成交量)
"""
vwap = (prices * volumes).sum() / volumes.sum()
return vwap
def vwap_execution_algorithm(target_quantity, historical_volumes, num_slices):
"""
VWAP 执行算法
将大单拆分成多个小单,按历史成交量比例分配
参数:
target_quantity: 目标交易量
historical_volumes: 历史成交量模式
num_slices: 拆分份数
返回:
每个时间片的交易量分配
"""
# 归一化历史成交量
volume_weights = historical_volumes / historical_volumes.sum()
# 按权重分配目标交易量
allocation = target_quantity * volume_weights
# 调整为整数份
allocation = (allocation / allocation.sum() * target_quantity).round()
return allocation
# VWAP 算法模拟
np.random.seed(42)
n_periods = 100
# 模拟日内成交量模式(U 型:开盘和收盘成交量大)
time_pattern = np.exp(-((np.arange(n_periods) - n_periods/2)**2) / (2 * (n_periods/6)**2))
intraday_volumes = np.random.poisson(1000 * time_pattern)
# 目标交易量
target_qty = 50000
# 使用 VWAP 算法分配
vwap_allocation = vwap_execution_algorithm(
target_qty,
pd.Series(intraday_volumes),
10
)
print("\n【VWAP 执行算法】")
print("-" * 60)
print(f"目标交易量: {target_qty:,}")
print(f"实际分配总量: {vwap_allocation.sum():,}")
print(f"\n时间片分配:")
for i, qty in enumerate(vwap_allocation[:10]):
print(f" 时间片 {i+1}: {qty:,.0f} 股 ({qty/target_qty*100:.1f}%)")
print("-" * 60)
# 可视化
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# VWAP 分配
ax1 = axes[0]
ax1.bar(range(len(vwap_allocation)), vwap_allocation, edgecolor='black', alpha=0.7)
ax1.set_xlabel('时间片')
ax1.set_ylabel('分配交易量')
ax1.set_title('VWAP 算法: 按成交量比例分配')
ax1.grid(True, alpha=0.3, axis='y')
# 与历史成交量模式对比
ax2 = axes[1]
ax2.plot(intraday_volumes, label='历史成交量模式', linewidth=2, color='blue')
ax2_twin = ax2.twinx()
# 归一化后对比
vwap_normalized = vwap_allocation.values / vwap_allocation.sum() * len(vwap_allocation) * intraday_volumes.mean()
ax2_twin.bar(range(len(vwap_allocation)), vwap_normalized,
alpha=0.5, color='orange', label='VWAP 分配(归一化)')
ax2.set_xlabel('时间')
ax2.set_ylabel('成交量', color='blue')
ax2_twin.set_ylabel('VWAP 分配量', color='orange')
ax2.set_title('VWAP 分配与成交量模式匹配')
ax2.legend(loc='upper left')
ax2_twin.legend(loc='upper right')
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()5.2 TWAP (Time Weighted Average Price)
目标: 在给定时间内均匀分配交易量。
def twap_execution_algorithm(target_quantity, num_periods):
"""
TWAP 执行算法
将大单均匀分配到各个时间片
参数:
target_quantity: 目标交易量
num_periods: 时间片数量
返回:
每个时间片的交易量分配
"""
quantity_per_period = target_quantity / num_periods
# 均匀分配
allocation = np.full(num_periods, quantity_per_period)
return allocation
# TWAP 算法示例
twap_allocation = twap_execution_algorithm(target_qty, 20)
print("\n【TWAP 执行算法】")
print("-" * 60)
print(f"目标交易量: {target_qty:,}")
print(f"时间片数: 20")
print(f"每片分配: {target_qty/20:,.0f} 股")
print("-" * 60)
# VWAP vs TWAP 对比
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# VWAP
ax1 = axes[0]
ax1.bar(range(len(vwap_allocation)), vwap_allocation, edgecolor='black', alpha=0.7)
ax1.plot(intraday_volumes / intraday_volumes.sum() * target_qty,
'r-', linewidth=2, label='成交量模式')
ax1.set_xlabel('时间片')
ax1.set_ylabel('交易量')
ax1.set_title('VWAP: 跟随成交量模式')
ax1.legend()
ax1.grid(True, alpha=0.3, axis='y')
# TWAP
ax2 = axes[1]
ax2.bar(range(len(twap_allocation)), twap_allocation, edgecolor='black', alpha=0.7)
ax2.axhline(target_qty/len(twap_allocation), color='red', linestyle='--',
linewidth=2, label='目标量/时间片')
ax2.set_xlabel('时间片')
ax2.set_ylabel('交易量')
ax2.set_title('TWAP: 均匀分配')
ax2.legend()
ax2.grid(True, alpha=0.3, axis='y')
plt.tight_layout()
plt.show()5.3 执行质量评估
def evaluate_execution_quality(execution_prices, benchmark_prices, execution_volumes):
"""
评估执行算法质量
指标:
- VWAP 滑点
- 执行短差
- 市场冲击
"""
# 计算 VWAP
execution_vwap = (execution_prices * execution_volumes).sum() / execution_volumes.sum()
benchmark_vwap = (benchmark_prices * execution_volumes).sum() / execution_volumes.sum()
# 滑点
slippage = execution_vwap - benchmark_vwap
slippage_bps = slippage / benchmark_vwap * 10000 # 基点
return {
'execution_vwap': execution_vwap,
'benchmark_vwap': benchmark_vwap,
'slippage': slippage,
'slippage_bps': slippage_bps
}六、实现波动率 (Realized Volatility)
6.1 定义
实现波动率 是利用高频数据计算的实际波动率:
其中 是高频收益率。
6.2 Python 实现
def calculate_realized_volatility(prices, frequency='5min'):
"""
计算实现波动率
参数:
prices: 高频价格序列
frequency: 重采样频率
返回:
实现波动率(年化)
"""
# 重采样到指定频率
sampled_prices = prices.resample(frequency).last()
# 计算收益率
returns = sampled_prices.pct_change().dropna()
# 实现波动率 = 收益率平方和
rv = (returns ** 2).sum()
# 年化(假设 252 个交易日,每天 6.5 小时)
if frequency == '5min':
periods_per_day = 78 # 6.5 * 60 / 5
elif frequency == '1min':
periods_per_day = 390
elif frequency == '1h':
periods_per_day = 6.5
else:
periods_per_day = 1
annualized_rv = rv * 252 / periods_per_day
return np.sqrt(annualized_rv)
# 模拟高频数据
np.random.seed(42)
n_minutes = 390 # 一天 390 分钟
# 生成分钟级价格(几何布朗运动)
drift = 0.05 / 252 / 390 # 日收益率 5% / 390 分钟
vol = 0.2 / np.sqrt(252 * 390) # 年化 20% 波动率
minute_returns = np.random.normal(drift, vol, n_minutes)
minute_prices = 100 * np.exp(np.cumsum(minute_returns))
# 计算不同频率的实现波动率
price_series = pd.Series(minute_prices)
rv_1min = calculate_realized_volatility(price_series, '1min')
rv_5min = calculate_realized_volatility(price_series, '5min')
rv_1h = calculate_realized_volatility(price_series, '1h')
print("\n【实现波动率计算】")
print("-" * 50)
print(f"1 分钟 RV: {rv_1min:.4f}")
print(f"5 分钟 RV: {rv_5min:.4f}")
print(f"1 小时 RV: {rv_1h:.4f}")
print("-" * 50)
# 双幂次变差 (Realized Bipower Variation) - 更稳健的波动率估计
def calculate_bipower_variation(returns):
"""
双幂次变差
对跳跃更稳健的波动率估计量
"""
n = len(returns)
bpv = (np.pi / 2) * (
np.abs(returns.iloc[:-1].values) * np.abs(returns.iloc[1:].values)
).sum()
return bpv
# 可视化
fig, axes = plt.subplots(2, 2, figsize=(12, 8))
# 价格路径
ax1 = axes[0, 0]
ax1.plot(price_series.values, linewidth=1)
ax1.set_title('模拟日内高频价格路径')
ax1.set_xlabel('分钟')
ax1.set_ylabel('价格')
ax1.grid(True, alpha=0.3)
# 收益率分布
ax2 = axes[0, 1]
ax2.hist(minute_returns, bins=50, edgecolor='black', alpha=0.7)
ax2.axvline(0, color='red', linestyle='--', linewidth=2)
ax2.set_title('分钟收益率分布')
ax2.set_xlabel('收益率')
ax2.set_ylabel('频数')
ax2.grid(True, alpha=0.3)
# 滚动波动率
ax3 = axes[1, 0]
window = 30
rolling_vol = pd.Series(minute_returns).rolling(window).std() * np.sqrt(390 * 252)
ax3.plot(rolling_vol.values, linewidth=2)
ax3.axhline(0.2, color='red', linestyle='--', linewidth=2, label='真实波动率 20%')
ax3.set_title(f'滚动波动率({window}分钟窗口)')
ax3.set_xlabel('分钟')
ax3.set_ylabel('年化波动率')
ax3.legend()
ax3.grid(True, alpha=0.3)
# 累计平方收益(RV 构成)
ax4 = axes[1, 1]
cumulative_rv = np.cumsum(np.array(minute_returns) ** 2)
ax4.plot(cumulative_rv, linewidth=2, label='累计 RV')
# 添加拟合线
z = np.polyfit(range(len(cumulative_rv)), cumulative_rv, 1)
p = np.poly1d(z)
ax4.plot(range(len(cumulative_rv)), p(range(len(cumulative_rv))),
'r--', linewidth=2, label='线性拟合')
ax4.set_title('累计平方收益(RV)')
ax4.set_xlabel('分钟')
ax4.set_ylabel('累计 RV')
ax4.legend()
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()七、微价格 (Microprice)
7.1 定义
微价格 是利用订单簿深度信息估计的公允价格:
7.2 Python 实现
def calculate_microprice(bid_price, ask_price, bid_volume, ask_volume):
"""
计算微价格
考虑订单簿深度的加权中间价
"""
if (bid_volume + ask_volume) == 0:
return (bid_price + ask_price) / 2
microprice = (
bid_price * ask_volume +
ask_price * bid_volume
) / (bid_volume + ask_volume)
return microprice
# 模拟数据
np.random.seed(42)
n_obs = 100
bid_prices = np.random.uniform(9.8, 10, n_obs)
ask_prices = bid_prices + np.random.uniform(0.01, 0.05, n_obs)
bid_volumes = np.random.uniform(100, 1000, n_obs)
ask_volumes = np.random.uniform(100, 1000, n_obs)
# 计算微价格
microprices = [
calculate_microprice(bp, ap, bv, av)
for bp, ap, bv, av in zip(bid_prices, ask_prices, bid_volumes, ask_volumes)
]
mid_prices = (bid_prices + ask_prices) / 2
# 比较微价格与中间价
print("\n【微价格 vs 中间价】")
print("-" * 60)
print(f"{'指标':<20} {'微价格':<15} {'中间价':<15}")
print("-" * 60)
print(f"{'均值':<20} {np.mean(microprices):<15.4f} {np.mean(mid_prices):<15.4f}")
print(f"{'标准差':<20} {np.std(microprices):<15.4f} {np.std(mid_prices):<15.4f}")
print("-" * 60)
print("微价格考虑了订单簿深度,更能反映真实交易成本")
# 可视化
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(mid_prices, label='中间价', linewidth=2, alpha=0.7)
plt.plot(microprices, label='微价格', linewidth=2, alpha=0.7)
plt.xlabel('时间')
plt.ylabel('价格')
plt.title('微价格 vs 中间价')
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(1, 2, 2)
plt.scatter(mid_prices, microprices, alpha=0.5, s=20)
min_val = min(min(mid_prices), min(microprices))
max_val = max(max(mid_prices), max(microprices))
plt.plot([min_val, max_val], [min_val, max_val], 'r--', linewidth=2)
plt.xlabel('中间价')
plt.ylabel('微价格')
plt.title('微价格与中间价的关系')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()八、做市商行为分析
8.1 价差设置
做市商如何设置买卖价差?
8.2 库存管理
def market_maker_inventory_management(current_inventory, target_inventory, price_deviation):
"""
做市商库存管理策略
参数:
current_inventory: 当前库存
target_inventory: 目标库存(通常为 0)
price_deviation: 当前价格偏离公允价值的程度
返回:
调整后的报价偏移
"""
# 库存惩罚
inventory_penalty = -0.01 * (current_inventory - target_inventory)
# 报价偏移
quote_adjustment = inventory_penalty + price_deviation * 0.5
return quote_adjustment
# 模拟做市商行为
np.random.seed(42)
n_steps = 100
inventory = 0
target_inventory = 0
price_deviations = np.random.normal(0, 0.01, n_steps)
inventory_history = []
quote_adjustments = []
for deviation in price_deviations:
# 随机成交(假设 50% 概率)
if np.random.random() < 0.3:
trade = np.random.choice([-1, 1]) # 买入或卖出
inventory += trade
# 计算报价调整
adjustment = market_maker_inventory_management(inventory, target_inventory, deviation)
inventory_history.append(inventory)
quote_adjustments.append(adjustment)
# 可视化
fig, axes = plt.subplots(2, 1, figsize=(12, 8))
# 库存变化
ax1 = axes[0]
ax1.plot(inventory_history, linewidth=2, label='库存')
ax1.axhline(0, color='red', linestyle='--', linewidth=2, label='目标库存')
ax1.fill_between(range(len(inventory_history)), 0, inventory_history,
where=np.array(inventory_history) > 0, alpha=0.3, color='green', label='多头头寸')
ax1.fill_between(range(len(inventory_history)), 0, inventory_history,
where=np.array(inventory_history) < 0, alpha=0.3, color='red', label='空头头寸')
ax1.set_xlabel('时间步')
ax1.set_ylabel('库存量')
ax1.set_title('做市商库存变化')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 报价调整
ax2 = axes[1]
ax2.plot(quote_adjustments, linewidth=2, label='报价调整')
ax2.axhline(0, color='black', linestyle='--', linewidth=1)
ax2.fill_between(range(len(quote_adjustments)), 0, quote_adjustments,
where=np.array(quote_adjustments) > 0, alpha=0.3, color='green')
ax2.fill_between(range(len(quote_adjustments)), 0, quote_adjustments,
where=np.array(quote_adjustments) < 0, alpha=0.3, color='red')
ax2.set_xlabel('时间步')
ax2.set_ylabel('报价调整')
ax2.set_title('库存驱动的报价调整')
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()九、流动性度量综合
def comprehensive_liquidity_analysis(prices, volumes, bid_prices=None, ask_prices=None):
"""
综合流动性分析
参数:
prices: 价格序列
volumes: 成交量序列
bid_prices: (可选) 买价序列
ask_prices: (可选) 卖价序列
返回:
流动性指标字典
"""
returns = prices.pct_change().dropna()
metrics = {}
# 1. Amihud 非流动性
metrics['amihud'] = calculate_amihud_illiquidity(returns, volumes)
# 2. Roll 价差
if bid_prices is not None and ask_prices is not None:
metrics['absolute_spread'] = (ask_prices - bid_prices).mean()
metrics['relative_spread'] = ((ask_prices - bid_prices) / prices).mean()
else:
roll_spread = roll_spread_estimator(returns)
metrics['roll_spread'] = roll_spread
# 3. 买卖价差变异系数
if bid_prices is not None and ask_prices is not None:
spreads = ask_prices - bid_prices
metrics['spread_cv'] = spreads.std() / spreads.mean()
# 4. 成交量变异系数
metrics['volume_cv'] = volumes.std() / volumes.mean()
# 5. 价格影响回归
price_impact = estimate_kyle_lambda(prices, volumes, pd.Series([1] * len(volumes)))
metrics['kyle_lambda'] = price_impact['lambda']
# 6. 流动性综合得分(归一化后平均)
# 注意:这里简化处理,实际应用需要更复杂的权重设计
liquidity_score = 1 / (
1 + metrics['amihud'] +
(metrics.get('relative_spread', 0) * 100) +
(metrics.get('kyle_lambda', 0) * 10000)
)
metrics['liquidity_score'] = liquidity_score
return metrics
# 综合分析示例
np.random.seed(42)
n_days = 500
prices = pd.Series(100 * np.exp(np.cumsum(np.random.normal(0.0005, 0.02, n_days))))
volumes = pd.Series(np.random.lognormal(10, 0.5, n_days))
bid_prices = prices * (1 - np.random.uniform(0.001, 0.005, n_days))
ask_prices = prices * (1 + np.random.uniform(0.001, 0.005, n_days))
liquidity_metrics = comprehensive_liquidity_analysis(prices, volumes, bid_prices, ask_prices)
print("\n【综合流动性分析】")
print("=" * 60)
print(f"{'指标':<25} {'值':<20}")
print("-" * 60)
for metric, value in liquidity_metrics.items():
if 'score' in metric:
print(f"{metric:<25} {value:<20.4f}")
elif 'amihud' in metric or 'lambda' in metric:
print(f"{metric:<25} {value:<20.6f}")
else:
print(f"{metric:<25} {value:<20.4%}" if value < 1 else f"{metric:<25} {value:<20.4f}")
print("=" * 60)十、高频数据特性
10.1 数据特征
| 特性 | 说明 | 处理方法 |
|---|---|---|
| 数据量大 | 纳秒级时间戳,每日百万条记录 | 分布式计算、采样 |
| 噪声 | 微结构噪声(买卖价差跳跃等) | 已实现波动率、双幂次变差 |
| 非同步交易 | 不同股票最后成交时间不同 | 同步到共同网格 |
| 日内模式 | U 型成交量、开盘收盘波动大 | 去季节性调整 |
10.2 高频特征构造
def construct_high_frequency_features(tick_data):
"""
构造高频交易特征
参数:
tick_data: 包含时间戳、价格、成交量的 DataFrame
返回:
特征 DataFrame
"""
features = pd.DataFrame(index=tick_data.index)
# 1. 时间特征
features['hour'] = tick_data.index.hour
features['minute'] = tick_data.index.minute
features['time_to_close'] = (
tick_data.index - tick_data.index.normalize()
).dt.total_seconds() / 3600
# 2. 价格动量特征(多时间尺度)
for window in [5, 10, 30, 60]:
features[f'return_{window}min'] = (
tick_data['price'].pct_change(window)
)
# 3. 波动率特征
for window in [5, 10, 30]:
features[f'volatility_{window}min'] = (
tick_data['price'].pct_change().rolling(window).std()
)
# 4. 成交量特征
features['volume_ma_5'] = tick_data['volume'].rolling(5).mean()
features['volume_ratio'] = (
tick_data['volume'] / features['volume_ma_5']
)
# 5. 订单流不平衡(如果有买卖方向数据)
if 'direction' in tick_data.columns:
features['ofi'] = (
tick_data['direction'] * tick_data['volume']
).rolling(10).sum()
return features十一、核心知识点总结
| 概念 | 核心内容 | Python 实现 |
|---|---|---|
| 订单簿 | 价格优先、时间优先、流动性深度 | OrderBook 类模拟 |
| 买卖价差 | 订单处理成本、库存风险、信息不对称 | Roll 估计、CORWIN-SCHULTZ |
| 价格冲击 | Kyle 模型、Amihud 指标 | 线性回归估计 |
| 执行算法 | VWAP 跟随成交量、TWAP 均匀分配 | 按比例/均匀拆单 |
| 实现波动率 | 高频数据计算实际波动率 | 收益率平方和 |
| OFI | 订单流不平衡预测价格 | 买卖压力度量 |
| 微价格 | 考虑深度的加权中间价 | 深度加权公式 |
关键公式:
买卖价差 = Ask - Bid
相对价差 = (Ask - Bid) / Mid Price
Kyle 模型: Δp = λ × Volume + ε
Amihud: ILLIQ = |R| / Volume
VWAP: Σ(P × V) / Σ(V)
OFI: 买方变化 + 卖方变化
实践要点:
- 交易成本是策略盈利的隐形杀手,必须量化评估
- 流动性差的资产需要更大的价差补偿
- 大单拆分是降低冲击的关键技术
- 高频数据需要特殊的处理方法
下一步: 继续阅读 03-衍生品与波动率.md,了解期权定价和波动率交易。