市场微观结构

核心问题: 你的策略信号很好,但实际交易时会不会被买卖价差和价格冲击吃掉利润?


学习目标

  • 理解订单簿如何决定成交价格
  • 掌握买卖价差的来源和度量方法
  • 学习价格冲击模型和流动性指标
  • 实现 VWAP/TWAP 执行算法
  • 理解高频数据的特性与处理
  • 核心: 交易成本是策略盈利的隐形杀手

一、订单簿 (Order Book) 结构

1.1 什么是订单簿?

订单簿是实时记录所有未成交买卖订单的数据库,反映了市场的供需状态

          订单簿结构示意
          ─────────────────

    卖方 (Ask)        价格        买方 (Bid)
    ───────────────────────────────────────
    100股 @ 10.05     ┃        150股 @ 10.00
     50股 @ 10.04     ┃        200股 @  9.99
    200股 @ 10.03     ┃        100股 @  9.98
    150股 @ 10.02 ←─ 最优卖价 (Ask)    ┃        300股 @  9.97 ←─ 最优买价 (Bid)
    ───────────────────────────────────────

    买卖价差 = 10.02 - 9.97 = 0.05
    中间价   = (10.02 + 9.97) / 2 = 9.995

1.2 价格优先与时间优先

订单成交遵循两个原则:

优先级原则说明
1价格优先买入价高的优先,卖出价低的优先
2时间优先同价格情况下,下单时间早的优先

1.3 订单簿的 Python 模拟

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from collections import defaultdict
import heapq
 
class OrderBook:
    """订单簿模拟器"""
 
    def __init__(self, tick_size=0.01):
        """
        初始化订单簿
 
        参数:
            tick_size: 最小价格变动单位
        """
        self.tick_size = tick_size
        self.bids = {}   # 买方订单: {price: [(order_id, quantity, timestamp), ...]}
        self.asks = {}   # 卖方订单: {price: [(order_id, quantity, timestamp), ...]}
        self.order_id = 0
        self.trade_history = []  # 成交记录
 
    def add_limit_order(self, side, price, quantity, timestamp=None):
        """
        添加限价订单
 
        参数:
            side: 'bid'(买) 或 'ask'(卖)
            price: 价格
            quantity: 数量
            timestamp: 时间戳
 
        返回:
            成交记录列表
        """
        if timestamp is None:
            timestamp = len(self.trade_history)
 
        self.order_id += 1
        order = (self.order_id, quantity, timestamp)
 
        trades = []
 
        if side == 'bid':
            # 买入订单: 先检查能否与卖方订单成交
            while quantity > 0 and self.asks:
                best_ask = min(self.asks.keys())
                if price < best_ask:
                    break  # 买价低于最优卖价,无法成交
 
                # 与最优卖价成交
                ask_queue = self.asks[best_ask]
                while quantity > 0 and ask_queue:
                    ask_order = ask_queue[0]
                    ask_id, ask_qty, ask_time = ask_order
 
                    trade_qty = min(quantity, ask_qty)
                    trade_price = best_ask
 
                    trades.append({
                        'price': trade_price,
                        'quantity': trade_qty,
                        'timestamp': timestamp,
                        'bid_order_id': self.order_id,
                        'ask_order_id': ask_id
                    })
 
                    quantity -= trade_qty
                    ask_qty -= trade_qty
 
                    if ask_qty == 0:
                        # 卖方订单完全成交,移除
                        heapq.heappop(ask_queue)
                        if not ask_queue:
                            del self.asks[best_ask]
                    else:
                        # 更新卖方订单剩余数量
                        ask_queue[0] = (ask_id, ask_qty, ask_time)
 
            # 剩余数量加入买方订单簿
            if quantity > 0:
                if price not in self.bids:
                    self.bids[price] = []
                heapq.heappush(self.bids[price], order)
 
        else:  # side == 'ask'
            # 卖出订单: 先检查能否与买方订单成交
            while quantity > 0 and self.bids:
                best_bid = max(self.bids.keys())
                if price > best_bid:
                    break  # 卖价高于最优买价,无法成交
 
                # 与最优买价成交
                bid_queue = self.bids[best_bid]
                while quantity > 0 and bid_queue:
                    bid_order = bid_queue[0]
                    bid_id, bid_qty, bid_time = bid_order
 
                    trade_qty = min(quantity, bid_qty)
                    trade_price = best_bid
 
                    trades.append({
                        'price': trade_price,
                        'quantity': trade_qty,
                        'timestamp': timestamp,
                        'bid_order_id': bid_id,
                        'ask_order_id': self.order_id
                    })
 
                    quantity -= trade_qty
                    bid_qty -= trade_qty
 
                    if bid_qty == 0:
                        # 买方订单完全成交,移除
                        heapq.heappop(bid_queue)
                        if not bid_queue:
                            del self.bids[best_bid]
                    else:
                        # 更新买方订单剩余数量
                        bid_queue[0] = (bid_id, bid_qty, bid_time)
 
            # 剩余数量加入卖方订单簿
            if quantity > 0:
                if price not in self.asks:
                    self.asks[price] = []
                heapq.heappush(self.asks[price], order)
 
        self.trade_history.extend(trades)
        return trades
 
    def get_best_bid(self):
        """获取最优买价和数量"""
        if not self.bids:
            return None, 0
        best_price = max(self.bids.keys())
        total_qty = sum(order[1] for order in self.bids[best_price])
        return best_price, total_qty
 
    def get_best_ask(self):
        """获取最优卖价和数量"""
        if not self.asks:
            return None, 0
        best_price = min(self.asks.keys())
        total_qty = sum(order[1] for order in self.asks[best_price])
        return best_price, total_qty
 
    def get_spread(self):
        """获取买卖价差"""
        best_bid, _ = self.get_best_bid()
        best_ask, _ = self.get_best_ask()
        if best_bid is None or best_ask is None:
            return None
        return best_ask - best_bid
 
    def get_mid_price(self):
        """获取中间价"""
        best_bid, _ = self.get_best_bid()
        best_ask, _ = self.get_best_ask()
        if best_bid is None or best_ask is None:
            return None
        return (best_bid + best_ask) / 2
 
    def get_depth(self, n_levels=5):
        """
        获取订单簿深度
 
        返回:
            (bid_levels, ask_levels) 每个是 [(price, total_quantity), ...]
        """
        # 买方: 价格从高到低
        bid_prices = sorted(self.bids.keys(), reverse=True)[:n_levels]
        bid_levels = [
            (price, sum(order[1] for order in self.bids[price]))
            for price in bid_prices
        ]
 
        # 卖方: 价格从低到高
        ask_prices = sorted(self.asks.keys())[:n_levels]
        ask_levels = [
            (price, sum(order[1] for order in self.asks[price]))
            for price in ask_prices
        ]
 
        return bid_levels, ask_levels
 
    def visualize(self, n_levels=10):
        """可视化订单簿"""
        bid_levels, ask_levels = self.get_depth(n_levels)
 
        print("\n" + "=" * 50)
        print("订单簿状态")
        print("=" * 50)
        print(f"{'卖方 (Ask)':<30} {'价格':<10} {'买方 (Bid)':<30}")
        print("-" * 50)
 
        max_len = max(len(bid_levels), len(ask_levels))
 
        for i in range(max_len):
            ask_str = ""
            price_str = ""
            bid_str = ""
 
            if i < len(ask_levels):
                ask_price, ask_qty = ask_levels[i]
                ask_str = f"{ask_qty:>6}股"
 
            if i < len(bid_levels):
                bid_price, bid_qty = bid_levels[i]
                bid_str = f"{bid_qty:>6}股"
 
            if i < len(ask_levels) and i < len(bid_levels):
                price_str = f"{ask_levels[i][0]:.2f}"
            elif i < len(ask_levels):
                price_str = f"{ask_levels[i][0]:.2f}"
            elif i < len(bid_levels):
                price_str = f"{bid_levels[i][0]:.2f}"
 
            print(f"{ask_str:<30} {price_str:<10} {bid_str:<30}")
 
        best_bid, bid_qty = self.get_best_bid()
        best_ask, ask_qty = self.get_best_ask()
        spread = self.get_spread()
        mid = self.get_mid_price()
 
        print("-" * 50)
        print(f"最优买价: {best_bid} ({bid_qty}股)")
        print(f"最优卖价: {best_ask} ({ask_qty}股)")
        print(f"买卖价差: {spread} ({spread/mid*100:.2f}%)" if spread else "买卖价差: N/A")
        print(f"中间价: {mid}")
        print("=" * 50)
 
 
# 模拟订单簿
print("=" * 60)
print("订单簿模拟示例")
print("=" * 60)
 
# 创建订单簿
book = OrderBook(tick_size=0.01)
 
# 添加一些初始订单
print("\n【添加初始订单】")
book.add_limit_order('bid', 10.00, 150, timestamp=0)
book.add_limit_order('bid', 9.99, 200, timestamp=1)
book.add_limit_order('bid', 9.98, 100, timestamp=2)
book.add_limit_order('ask', 10.02, 150, timestamp=3)
book.add_limit_order('ask', 10.03, 200, timestamp=4)
book.add_limit_order('ask', 10.04, 100, timestamp=5)
 
book.visualize()
 
# 添加一个市价买入单(会立即成交)
print("\n【市价买入 300 股】")
trades = book.add_limit_order('bid', 10.10, 300, timestamp=6)  # 高价确保成交
print(f"成交了 {len(trades)} 笔:")
for trade in trades:
    print(f"  价格 {trade['price']}, 数量 {trade['quantity']}")
 
book.visualize()
 
# 添加一个限价卖单
print("\n【限价卖出 100 股 @ 10.01】")
trades = book.add_limit_order('ask', 10.01, 100, timestamp=7)
if trades:
    print(f"成交了 {len(trades)} 笔")
else:
    print("未能成交,加入订单簿")
 
book.visualize()

1.4 订单簿深度分析

def analyze_order_book_depth(order_book, n_levels=10):
    """
    分析订单簿深度特征
 
    参数:
        order_book: OrderBook 实例
        n_levels: 分析的档位深度
 
    返回:
        深度特征字典
    """
    bid_levels, ask_levels = order_book.get_depth(n_levels)
 
    # 计算累计深度
    bid_cumulative = np.cumsum([qty for _, qty in bid_levels])
    ask_cumulative = np.cumsum([qty for _, qty in ask_levels])
 
    # 计算订单簿斜率(价格随数量的变化)
    if len(bid_levels) > 1:
        bid_prices = [price for price, _ in bid_levels]
        bid_slope = (bid_prices[0] - bid_prices[-1]) / bid_cumulative[-1]
    else:
        bid_slope = 0
 
    if len(ask_levels) > 1:
        ask_prices = [price for price, _ in ask_levels]
        ask_slope = (ask_prices[-1] - ask_prices[0]) / ask_cumulative[-1]
    else:
        ask_slope = 0
 
    # 订单簿不平衡
    total_bid_qty = sum(qty for _, qty in bid_levels)
    total_ask_qty = sum(qty for _, qty in ask_levels)
    imbalance = (total_bid_qty - total_ask_qty) / (total_bid_qty + total_ask_qty)
 
    return {
        'bid_depth': total_bid_qty,
        'ask_depth': total_ask_qty,
        'imbalance': imbalance,  # 正值表示买方更强
        'bid_slope': bid_slope,
        'ask_slope': ask_slope,
        'spread_pct': (order_book.get_spread() / order_book.get_mid_price()) * 100
    }
 
# 分析深度
depth_features = analyze_order_book_depth(book)
print("\n【订单簿深度分析】")
print("-" * 50)
for feature, value in depth_features.items():
    if 'imbalance' in feature or 'pct' in feature:
        print(f"{feature}: {value:.4f}")
    else:
        print(f"{feature}: {value:.2f}")

二、买卖价差 (Bid-Ask Spread)

2.1 价差的来源

买卖价差不是”免费的午餐”,而是对做市商的补偿:

来源说明影响因素
订单处理成本交易所费用、清算费用、系统成本交易量越大,单位成本越低
库存风险做市商持有头寸面临价格波动价格波动越大,价差越宽
信息不对称与知情交易者交易的损失信息不对称越严重,价差越宽

2.2 价差的度量

def calculate_spread_metrics(prices, volumes=None):
    """
    计算价差相关指标
 
    参数:
        prices: 包含 bid 和 ask 价格的 DataFrame
        volumes: (可选) 包含 bid 和 ask 数量的 DataFrame
 
    返回:
        价差指标字典
    """
    # 绝对价差
    absolute_spread = prices['ask'] - prices['bid']
 
    # 相对价差 (百分比)
    mid_price = (prices['bid'] + prices['ask']) / 2
    relative_spread = absolute_spread / mid_price
 
    # 对数价差
    log_spread = np.log(prices['ask'] / prices['bid'])
 
    metrics = {
        'absolute_spread_mean': absolute_spread.mean(),
        'absolute_spread_std': absolute_spread.std(),
        'relative_spread_mean': relative_spread.mean(),
        'relative_spread_std': relative_spread.std(),
        'log_spread_mean': log_spread.mean(),
    }
 
    # 如果有数量数据,计算加权价差
    if volumes is not None:
        # 加权中间价(考虑订单深度)
        total_volume = volumes['bid'] + volumes['ask']
        weighted_mid = (
            prices['bid'] * volumes['ask'] +
            prices['ask'] * volumes['bid']
        ) / total_volume
 
        metrics['weighted_mid_price'] = weighted_mid.mean()
 
    return metrics

2.3 Roll 价差估计器

当我们只有成交价格数据时,可以用 Roll (1984) 模型估计价差:

其中 是半价差, 是真实价格波动率。

推导出:

def roll_spread_estimator(price_changes):
    """
    Roll 价差估计器
 
    当只有成交价格时,估计有效价差
 
    参数:
        price_changes: 价格变化序列
 
    返回:
        估计的价差 (绝对值)
    """
    # 计算一阶自协方差
    cov_lag1 = price_changes.autocorr(lag=1)
 
    if cov_lag1 >= 0:
        # 自协方差为正,无法估计(可能价格序列有问题)
        return None
 
    # Roll 公式
    spread = np.sqrt(-cov_lag1)
 
    return spread
 
# 模拟价格数据测试 Roll 估计器
np.random.seed(42)
n_obs = 1000
 
# 真实价差
true_spread = 0.02
 
# 生成真实价格(随机游走)
true_price = pd.Series(
    np.cumsum(np.random.normal(0, 0.01, n_obs)),
    index=range(n_obs)
)
 
# 生成观测价格(在买价和卖价之间跳跃)
observed_prices = []
current_mid = 100  # 初始中间价
 
for i in range(n_obs):
    # 价格在 bid 和 ask 之间跳跃
    if i % 2 == 0:
        # 买价
        observed_prices.append(current_mid - true_spread / 2)
    else:
        # 卖价
        observed_prices.append(current_mid + true_spread / 2)
    current_mid += true_price.iloc[i]
 
observed_prices = pd.Series(observed_prices)
price_changes = observed_prices.diff().dropna()
 
# 估计价差
estimated_spread = roll_spread_estimator(price_changes)
 
print("\n【Roll 价差估计】")
print("-" * 40)
print(f"真实价差: {true_spread:.4f}")
print(f"估计价差: {estimated_spread:.4f}" if estimated_spread else "无法估计(自协方差为正)")
print("-" * 40)

2.4 CORWIN-SCHULTZ 价差估计

另一种常用的价差估计方法,适用于高频数据:

def corwin_schultz_spread(high, low, window=1):
    """
    CORWIN-SCHULTZ 高低价差估计器
 
    参数:
        high: 最高价序列
        low: 最低价序列
        window: 窗口大小
 
    返回:
        估计的相对价差
    """
    # 计算高低价平方和
    def calculate_beta(high_t, high_t1, low_t, low_t1):
        numerator = (
            np.log(high_t / low_t) ** 2 +
            np.log(high_t1 / low_t1) ** 2
        )
        denominator = 2 / (window - 1)
        return numerator / denominator
 
    # 计算_gamma
    def calculate_gamma(high_t, high_t1, low_t, low_t1):
        return (
            np.log(max(high_t, high_t1) / min(low_t, low_t1)) ** 2 +
            np.log(max(high_t, high_t1) / min(low_t, low_t1)) ** 2
        ) / 2
 
    betas = []
    gammas = []
 
    for i in range(1, len(high) - window):
        beta = calculate_beta(
            high.iloc[i], high.iloc[i + window],
            low.iloc[i], low.iloc[i + window]
        )
        gamma = calculate_gamma(
            high.iloc[i], high.iloc[i + window],
            low.iloc[i], low.iloc[i + window]
        )
 
        betas.append(beta)
        gammas.append(gamma)
 
    betas = pd.Series(betas)
    gammas = pd.Series(gammas)
 
    # 计算_alpha
    alpha = (np.sqrt(2 * betas) - np.sqrt(gammas)) / (2 - np.sqrt(2))
 
    # 估计价差
    spread = 2 * (np.exp(alpha) - 1) / (1 + np.exp(alpha))
 
    return spread.mean()

三、价格冲击模型

3.1 价格冲击的来源

交易对价格的影响路径:

大单交易 → 消耗订单簿流动性 → 价格不利移动 → 价格冲击

两种冲击类型:
├── 临时冲击: 流动性成本,随时间恢复
└── 永久冲击: 信息释放,价格永久重估

3.2 Kyle 模型 (1985)

其中 是价格冲击系数。

def estimate_kyle_lambda(prices, volumes, direction):
    """
    估计 Kyle 价格冲击系数
 
    参数:
        prices: 价格序列
        volumes: 成交量序列
        direction: 交易方向 (+1 买入, -1 卖出)
 
    返回:
        lambda 系数 (每单位交易量的价格冲击)
    """
    # 计算价格变化
    price_changes = prices.diff().dropna()
 
    # 计算交易强度(方向 * 成交量)
    trade_intensity = direction * volumes
 
    # 对齐数据
    aligned_data = pd.DataFrame({
        'price_change': price_changes,
        'trade_intensity': trade_intensity
    }).dropna()
 
    # 回归: 价格变化 = λ * 交易强度 + 残差
    from sklearn.linear_model import LinearRegression
 
    X = aligned_data[['trade_intensity']].values
    y = aligned_data['price_change'].values
 
    model = LinearRegression(fit_intercept=True)
    model.fit(X, y)
 
    kyle_lambda = model.coef_[0]
 
    # 计算 R²
    r_squared = model.score(X, y)
 
    return {
        'lambda': kyle_lambda,
        'r_squared': r_squared,
        'intercept': model.intercept_
    }
 
# 模拟价格冲击数据
np.random.seed(42)
n_obs = 500
 
# 生成交易数据
volumes = np.random.lognormal(10, 1, n_obs)  # 成交量
direction = np.random.choice([1, -1], n_obs)  # 交易方向
 
# 真实的价格冲击系数
true_lambda = 0.0001
 
# 价格 = 基础价格 + 冲击 + 噪声
base_price_change = np.random.normal(0, 0.01, n_obs)
price_impact = true_lambda * direction * volumes
price_changes = base_price_change + price_impact
 
prices = 100 + np.cumsum(price_changes)
 
# 估计 Kyle Lambda
result = estimate_kyle_lambda(
    pd.Series(prices),
    pd.Series(volumes),
    pd.Series(direction)
)
 
print("\n【Kyle 价格冲击模型】")
print("-" * 50)
print(f"真实 λ: {true_lambda:.6f}")
print(f"估计 λ: {result['lambda']:.6f}")
print(f"R²: {result['r_squared']:.4f}")
print("-" * 50)
 
# 可视化价格冲击
plt.figure(figsize=(12, 5))
 
plt.subplot(1, 2, 1)
plt.scatter(
    direction * volumes,
    price_changes,
    alpha=0.5,
    s=20,
    label='实际数据'
)
# 添加拟合线
x_line = np.linspace(min(direction * volumes), max(direction * volumes), 100)
y_line = result['intercept'] + result['lambda'] * x_line
plt.plot(x_line, y_line, 'r-', linewidth=2, label=f'拟合线 (λ={result["lambda"]:.6f})')
plt.xlabel('交易强度(方向 × 成交量)')
plt.ylabel('价格变化')
plt.title('Kyle 模型: 交易量 vs 价格冲击')
plt.legend()
plt.grid(True, alpha=0.3)
 
plt.subplot(1, 2, 2)
# 累计价格冲击
cumulative_impact = np.cumsum(price_impact)
cumulative_base = np.cumsum(base_price_change)
plt.plot(cumulative_base, label='基础价格变化', alpha=0.7)
plt.plot(cumulative_base + cumulative_impact, label='实际价格(含冲击)', linewidth=2)
plt.xlabel('时间')
plt.ylabel('累计价格变化')
plt.title('价格冲击的累积效应')
plt.legend()
plt.grid(True, alpha=0.3)
 
plt.tight_layout()
plt.show()

3.3 Amihud 非流动性指标

其中 是天数。

def calculate_amihud_illiquidity(returns, volumes, scaling_factor=1e9):
    """
    计算 Amihud 非流动性指标
 
    参数:
        returns: 日收益率序列
        volumes: 日成交量序列
        scaling_factor: 缩放因子(默认为 10^9)
 
    返回:
        Amihud 非流动性指标
    """
    # |收益率| / 成交量
    illiquidity_daily = returns.abs() / volumes
 
    # 平均并缩放
    amihud = illiquidity_daily.mean() * scaling_factor
 
    return amihud
 
# 计算示例
returns_sample = pd.Series(np.random.normal(0.001, 0.02, 100))
volumes_sample = pd.Series(np.random.lognormal(10, 1, 100))
 
amihud_value = calculate_amihud_illiquidity(returns_sample, volumes_sample)
 
print("\n【Amihud 非流动性指标】")
print("-" * 40)
print(f"Amihud 值: {amihud_value:.6f}")
print("解释: Amihud 值越大,流动性越差(单位成交量的价格冲击大)")
print("-" * 40)

四、知情交易者与流动性提供者

4.1 PIN (Probability of Informed Trading)

PIN 衡量市场中知情交易者的比例:

其中:

  • : 信息事件发生的概率
  • : 知情交易者的交易速率
  • : 不知情交易者的交易速率

4.2 订单流不平衡 (Order Flow Imbalance)

def calculate_ofi(bid_prices, ask_prices, bid_volumes, ask_volumes):
    """
    计算订单流不平衡 (Order Flow Imbalance)
 
    OFI 反映了买卖压力的相对强度
 
    参数:
        bid_prices: 买价序列(多档位)
        ask_prices: 卖价序列(多档位)
        bid_volumes: 买量序列(多档位)
        ask_volumes: 卖量序列(多档位)
 
    返回:
        OFI 序列
    """
    ofi_values = []
 
    for t in range(1, len(bid_prices)):
        # 计算买方变化
        bid_change = 0
        # 价格提高(有利)
        bid_change += (bid_prices[t] > bid_prices[t-1]).sum() * bid_volumes[t][bid_prices[t] > bid_prices[t-1]].sum()
        # 价格降低(不利)
        bid_change -= (bid_prices[t] < bid_prices[t-1]).sum() * bid_volumes[t-1][bid_prices[t] < bid_prices[t-1]].sum()
        # 价格不变但量增加
        same_bid = bid_prices[t] == bid_prices[t-1]
        bid_change += (bid_volumes[t][same_bid] - bid_volumes[t-1][same_bid]).clip(lower=0).sum()
        bid_change -= (bid_volumes[t][same_bid] - bid_volumes[t-1][same_bid]).clip(upper=0).abs().sum()
 
        # 计算卖方变化(符号相反)
        ask_change = 0
        ask_change -= (ask_prices[t] < ask_prices[t-1]).sum() * ask_volumes[t][ask_prices[t] < ask_prices[t-1]].sum()
        ask_change += (ask_prices[t] > ask_prices[t-1]).sum() * ask_volumes[t-1][ask_prices[t] > ask_prices[t-1]].sum()
        same_ask = ask_prices[t] == ask_prices[t-1]
        ask_change += (ask_volumes[t][same_ask] - ask_volumes[t-1][same_ask]).clip(lower=0).sum()
        ask_change -= (ask_volumes[t][same_ask] - ask_volumes[t-1][same_ask]).clip(upper=0).abs().sum()
 
        # OFI = 买方变化 + 卖方变化
        ofi_values.append(bid_change + ask_change)
 
    return pd.Series(ofi_values)
 
# 简化版 OFI(只考虑最优买卖价)
def simple_ofi(mid_prices, returns):
    """
    简化的 OFI 计算方法
 
    使用价格变化方向作为订单流代理
    """
    # 价格上涨 = 买方压力大 = 正 OFI
    ofi = np.sign(returns) * returns.abs()
    return ofi
 
# 模拟 OFI 预测力测试
np.random.seed(42)
n_days = 500
 
# 生成 OFI 信号(正态分布)
ofi_signal = np.random.normal(0, 1, n_days)
 
# OFI 对未来收益有预测力
future_returns = (
    0.0005 +  # 基础收益
    0.001 * ofi_signal +  # OFI 的线性影响
    np.random.normal(0, 0.02, n_days)  # 噪声
)
 
# 测试预测力
from scipy.stats import pearsonr
 
correlation, p_value = pearsonr(ofi_signal, future_returns)
 
print("\n【订单流不平衡 (OFI) 预测力测试】")
print("-" * 60)
print(f"OFI 与未来收益的相关系数: {correlation:.4f}")
print(f"P 值: {p_value:.4f}")
print(f"{'显著' if p_value < 0.05 else '不显著'}")
print("-" * 60)
 
# 可视化
plt.figure(figsize=(12, 5))
 
plt.subplot(1, 2, 1)
# 将 OFI 分为 5 组,计算每组平均收益
ofi_quintiles = pd.qcut(pd.Series(ofi_signal), 5, labels=['Q1', 'Q2', 'Q3', 'Q4', 'Q5'])
returns_by_quintile = pd.DataFrame({
    'ofi_group': ofi_quintiles,
    'return': future_returns
}).groupby('ofi_group')['return'].mean()
 
colors = ['green' if x > 0 else 'red' for x in returns_by_quintile.values]
returns_by_quintile.plot(kind='bar', color=colors, edgecolor='black')
plt.axhline(0, color='black', linewidth=0.8)
plt.xlabel('OFI 分组(Q1=最负,Q5=最正)')
plt.ylabel('平均收益')
plt.title('OFI 与未来收益的关系')
plt.grid(True, alpha=0.3, axis='y')
 
plt.subplot(1, 2, 2)
plt.scatter(ofi_signal, future_returns, alpha=0.5, s=20)
z = np.polyfit(ofi_signal, future_returns, 1)
p = np.poly1d(z)
plt.plot(ofi_signal, p(ofi_signal), "r-", linewidth=2, label=f'拟合线 (斜率={z[0]:.4f})')
plt.xlabel('订单流不平衡 (OFI)')
plt.ylabel('未来收益')
plt.title('OFI 预测收益散点图')
plt.legend()
plt.grid(True, alpha=0.3)
 
plt.tight_layout()
plt.show()

五、VWAP / TWAP 执行算法

5.1 VWAP (Volume Weighted Average Price)

目标: 以接近市场成交量加权平均价的价格执行订单。

def calculate_vwap(prices, volumes):
    """
    计算 VWAP
 
    VWAP = Σ(价格 × 成交量) / Σ(成交量)
    """
    vwap = (prices * volumes).sum() / volumes.sum()
    return vwap
 
def vwap_execution_algorithm(target_quantity, historical_volumes, num_slices):
    """
    VWAP 执行算法
 
    将大单拆分成多个小单,按历史成交量比例分配
 
    参数:
        target_quantity: 目标交易量
        historical_volumes: 历史成交量模式
        num_slices: 拆分份数
 
    返回:
        每个时间片的交易量分配
    """
    # 归一化历史成交量
    volume_weights = historical_volumes / historical_volumes.sum()
 
    # 按权重分配目标交易量
    allocation = target_quantity * volume_weights
 
    # 调整为整数份
    allocation = (allocation / allocation.sum() * target_quantity).round()
 
    return allocation
 
# VWAP 算法模拟
np.random.seed(42)
n_periods = 100
 
# 模拟日内成交量模式(U 型:开盘和收盘成交量大)
time_pattern = np.exp(-((np.arange(n_periods) - n_periods/2)**2) / (2 * (n_periods/6)**2))
intraday_volumes = np.random.poisson(1000 * time_pattern)
 
# 目标交易量
target_qty = 50000
 
# 使用 VWAP 算法分配
vwap_allocation = vwap_execution_algorithm(
    target_qty,
    pd.Series(intraday_volumes),
    10
)
 
print("\n【VWAP 执行算法】")
print("-" * 60)
print(f"目标交易量: {target_qty:,}")
print(f"实际分配总量: {vwap_allocation.sum():,}")
print(f"\n时间片分配:")
for i, qty in enumerate(vwap_allocation[:10]):
    print(f"  时间片 {i+1}: {qty:,.0f} 股 ({qty/target_qty*100:.1f}%)")
print("-" * 60)
 
# 可视化
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
 
# VWAP 分配
ax1 = axes[0]
ax1.bar(range(len(vwap_allocation)), vwap_allocation, edgecolor='black', alpha=0.7)
ax1.set_xlabel('时间片')
ax1.set_ylabel('分配交易量')
ax1.set_title('VWAP 算法: 按成交量比例分配')
ax1.grid(True, alpha=0.3, axis='y')
 
# 与历史成交量模式对比
ax2 = axes[1]
ax2.plot(intraday_volumes, label='历史成交量模式', linewidth=2, color='blue')
ax2_twin = ax2.twinx()
# 归一化后对比
vwap_normalized = vwap_allocation.values / vwap_allocation.sum() * len(vwap_allocation) * intraday_volumes.mean()
ax2_twin.bar(range(len(vwap_allocation)), vwap_normalized,
             alpha=0.5, color='orange', label='VWAP 分配(归一化)')
ax2.set_xlabel('时间')
ax2.set_ylabel('成交量', color='blue')
ax2_twin.set_ylabel('VWAP 分配量', color='orange')
ax2.set_title('VWAP 分配与成交量模式匹配')
ax2.legend(loc='upper left')
ax2_twin.legend(loc='upper right')
ax2.grid(True, alpha=0.3)
 
plt.tight_layout()
plt.show()

5.2 TWAP (Time Weighted Average Price)

目标: 在给定时间内均匀分配交易量。

def twap_execution_algorithm(target_quantity, num_periods):
    """
    TWAP 执行算法
 
    将大单均匀分配到各个时间片
 
    参数:
        target_quantity: 目标交易量
        num_periods: 时间片数量
 
    返回:
        每个时间片的交易量分配
    """
    quantity_per_period = target_quantity / num_periods
 
    # 均匀分配
    allocation = np.full(num_periods, quantity_per_period)
 
    return allocation
 
# TWAP 算法示例
twap_allocation = twap_execution_algorithm(target_qty, 20)
 
print("\n【TWAP 执行算法】")
print("-" * 60)
print(f"目标交易量: {target_qty:,}")
print(f"时间片数: 20")
print(f"每片分配: {target_qty/20:,.0f} 股")
print("-" * 60)
 
# VWAP vs TWAP 对比
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
 
# VWAP
ax1 = axes[0]
ax1.bar(range(len(vwap_allocation)), vwap_allocation, edgecolor='black', alpha=0.7)
ax1.plot(intraday_volumes / intraday_volumes.sum() * target_qty,
         'r-', linewidth=2, label='成交量模式')
ax1.set_xlabel('时间片')
ax1.set_ylabel('交易量')
ax1.set_title('VWAP: 跟随成交量模式')
ax1.legend()
ax1.grid(True, alpha=0.3, axis='y')
 
# TWAP
ax2 = axes[1]
ax2.bar(range(len(twap_allocation)), twap_allocation, edgecolor='black', alpha=0.7)
ax2.axhline(target_qty/len(twap_allocation), color='red', linestyle='--',
            linewidth=2, label='目标量/时间片')
ax2.set_xlabel('时间片')
ax2.set_ylabel('交易量')
ax2.set_title('TWAP: 均匀分配')
ax2.legend()
ax2.grid(True, alpha=0.3, axis='y')
 
plt.tight_layout()
plt.show()

5.3 执行质量评估

def evaluate_execution_quality(execution_prices, benchmark_prices, execution_volumes):
    """
    评估执行算法质量
 
    指标:
    - VWAP 滑点
    - 执行短差
    - 市场冲击
    """
    # 计算 VWAP
    execution_vwap = (execution_prices * execution_volumes).sum() / execution_volumes.sum()
    benchmark_vwap = (benchmark_prices * execution_volumes).sum() / execution_volumes.sum()
 
    # 滑点
    slippage = execution_vwap - benchmark_vwap
    slippage_bps = slippage / benchmark_vwap * 10000  # 基点
 
    return {
        'execution_vwap': execution_vwap,
        'benchmark_vwap': benchmark_vwap,
        'slippage': slippage,
        'slippage_bps': slippage_bps
    }

六、实现波动率 (Realized Volatility)

6.1 定义

实现波动率 是利用高频数据计算的实际波动率:

其中 是高频收益率。

6.2 Python 实现

def calculate_realized_volatility(prices, frequency='5min'):
    """
    计算实现波动率
 
    参数:
        prices: 高频价格序列
        frequency: 重采样频率
 
    返回:
        实现波动率(年化)
    """
    # 重采样到指定频率
    sampled_prices = prices.resample(frequency).last()
 
    # 计算收益率
    returns = sampled_prices.pct_change().dropna()
 
    # 实现波动率 = 收益率平方和
    rv = (returns ** 2).sum()
 
    # 年化(假设 252 个交易日,每天 6.5 小时)
    if frequency == '5min':
        periods_per_day = 78  # 6.5 * 60 / 5
    elif frequency == '1min':
        periods_per_day = 390
    elif frequency == '1h':
        periods_per_day = 6.5
    else:
        periods_per_day = 1
 
    annualized_rv = rv * 252 / periods_per_day
 
    return np.sqrt(annualized_rv)
 
# 模拟高频数据
np.random.seed(42)
n_minutes = 390  # 一天 390 分钟
 
# 生成分钟级价格(几何布朗运动)
drift = 0.05 / 252 / 390  # 日收益率 5% / 390 分钟
vol = 0.2 / np.sqrt(252 * 390)  # 年化 20% 波动率
 
minute_returns = np.random.normal(drift, vol, n_minutes)
minute_prices = 100 * np.exp(np.cumsum(minute_returns))
 
# 计算不同频率的实现波动率
price_series = pd.Series(minute_prices)
 
rv_1min = calculate_realized_volatility(price_series, '1min')
rv_5min = calculate_realized_volatility(price_series, '5min')
rv_1h = calculate_realized_volatility(price_series, '1h')
 
print("\n【实现波动率计算】")
print("-" * 50)
print(f"1 分钟 RV: {rv_1min:.4f}")
print(f"5 分钟 RV: {rv_5min:.4f}")
print(f"1 小时 RV: {rv_1h:.4f}")
print("-" * 50)
 
# 双幂次变差 (Realized Bipower Variation) - 更稳健的波动率估计
def calculate_bipower_variation(returns):
    """
    双幂次变差
 
    对跳跃更稳健的波动率估计量
    """
    n = len(returns)
    bpv = (np.pi / 2) * (
        np.abs(returns.iloc[:-1].values) * np.abs(returns.iloc[1:].values)
    ).sum()
 
    return bpv
 
# 可视化
fig, axes = plt.subplots(2, 2, figsize=(12, 8))
 
# 价格路径
ax1 = axes[0, 0]
ax1.plot(price_series.values, linewidth=1)
ax1.set_title('模拟日内高频价格路径')
ax1.set_xlabel('分钟')
ax1.set_ylabel('价格')
ax1.grid(True, alpha=0.3)
 
# 收益率分布
ax2 = axes[0, 1]
ax2.hist(minute_returns, bins=50, edgecolor='black', alpha=0.7)
ax2.axvline(0, color='red', linestyle='--', linewidth=2)
ax2.set_title('分钟收益率分布')
ax2.set_xlabel('收益率')
ax2.set_ylabel('频数')
ax2.grid(True, alpha=0.3)
 
# 滚动波动率
ax3 = axes[1, 0]
window = 30
rolling_vol = pd.Series(minute_returns).rolling(window).std() * np.sqrt(390 * 252)
ax3.plot(rolling_vol.values, linewidth=2)
ax3.axhline(0.2, color='red', linestyle='--', linewidth=2, label='真实波动率 20%')
ax3.set_title(f'滚动波动率({window}分钟窗口)')
ax3.set_xlabel('分钟')
ax3.set_ylabel('年化波动率')
ax3.legend()
ax3.grid(True, alpha=0.3)
 
# 累计平方收益(RV 构成)
ax4 = axes[1, 1]
cumulative_rv = np.cumsum(np.array(minute_returns) ** 2)
ax4.plot(cumulative_rv, linewidth=2, label='累计 RV')
# 添加拟合线
z = np.polyfit(range(len(cumulative_rv)), cumulative_rv, 1)
p = np.poly1d(z)
ax4.plot(range(len(cumulative_rv)), p(range(len(cumulative_rv))),
         'r--', linewidth=2, label='线性拟合')
ax4.set_title('累计平方收益(RV)')
ax4.set_xlabel('分钟')
ax4.set_ylabel('累计 RV')
ax4.legend()
ax4.grid(True, alpha=0.3)
 
plt.tight_layout()
plt.show()

七、微价格 (Microprice)

7.1 定义

微价格 是利用订单簿深度信息估计的公允价格:

7.2 Python 实现

def calculate_microprice(bid_price, ask_price, bid_volume, ask_volume):
    """
    计算微价格
 
    考虑订单簿深度的加权中间价
    """
    if (bid_volume + ask_volume) == 0:
        return (bid_price + ask_price) / 2
 
    microprice = (
        bid_price * ask_volume +
        ask_price * bid_volume
    ) / (bid_volume + ask_volume)
 
    return microprice
 
# 模拟数据
np.random.seed(42)
n_obs = 100
 
bid_prices = np.random.uniform(9.8, 10, n_obs)
ask_prices = bid_prices + np.random.uniform(0.01, 0.05, n_obs)
bid_volumes = np.random.uniform(100, 1000, n_obs)
ask_volumes = np.random.uniform(100, 1000, n_obs)
 
# 计算微价格
microprices = [
    calculate_microprice(bp, ap, bv, av)
    for bp, ap, bv, av in zip(bid_prices, ask_prices, bid_volumes, ask_volumes)
]
 
mid_prices = (bid_prices + ask_prices) / 2
 
# 比较微价格与中间价
print("\n【微价格 vs 中间价】")
print("-" * 60)
print(f"{'指标':<20} {'微价格':<15} {'中间价':<15}")
print("-" * 60)
print(f"{'均值':<20} {np.mean(microprices):<15.4f} {np.mean(mid_prices):<15.4f}")
print(f"{'标准差':<20} {np.std(microprices):<15.4f} {np.std(mid_prices):<15.4f}")
print("-" * 60)
print("微价格考虑了订单簿深度,更能反映真实交易成本")
 
# 可视化
plt.figure(figsize=(12, 5))
 
plt.subplot(1, 2, 1)
plt.plot(mid_prices, label='中间价', linewidth=2, alpha=0.7)
plt.plot(microprices, label='微价格', linewidth=2, alpha=0.7)
plt.xlabel('时间')
plt.ylabel('价格')
plt.title('微价格 vs 中间价')
plt.legend()
plt.grid(True, alpha=0.3)
 
plt.subplot(1, 2, 2)
plt.scatter(mid_prices, microprices, alpha=0.5, s=20)
min_val = min(min(mid_prices), min(microprices))
max_val = max(max(mid_prices), max(microprices))
plt.plot([min_val, max_val], [min_val, max_val], 'r--', linewidth=2)
plt.xlabel('中间价')
plt.ylabel('微价格')
plt.title('微价格与中间价的关系')
plt.grid(True, alpha=0.3)
 
plt.tight_layout()
plt.show()

八、做市商行为分析

8.1 价差设置

做市商如何设置买卖价差?

8.2 库存管理

def market_maker_inventory_management(current_inventory, target_inventory, price_deviation):
    """
    做市商库存管理策略
 
    参数:
        current_inventory: 当前库存
        target_inventory: 目标库存(通常为 0)
        price_deviation: 当前价格偏离公允价值的程度
 
    返回:
        调整后的报价偏移
    """
    # 库存惩罚
    inventory_penalty = -0.01 * (current_inventory - target_inventory)
 
    # 报价偏移
    quote_adjustment = inventory_penalty + price_deviation * 0.5
 
    return quote_adjustment
 
# 模拟做市商行为
np.random.seed(42)
n_steps = 100
 
inventory = 0
target_inventory = 0
price_deviations = np.random.normal(0, 0.01, n_steps)
inventory_history = []
quote_adjustments = []
 
for deviation in price_deviations:
    # 随机成交(假设 50% 概率)
    if np.random.random() < 0.3:
        trade = np.random.choice([-1, 1])  # 买入或卖出
        inventory += trade
 
    # 计算报价调整
    adjustment = market_maker_inventory_management(inventory, target_inventory, deviation)
 
    inventory_history.append(inventory)
    quote_adjustments.append(adjustment)
 
# 可视化
fig, axes = plt.subplots(2, 1, figsize=(12, 8))
 
# 库存变化
ax1 = axes[0]
ax1.plot(inventory_history, linewidth=2, label='库存')
ax1.axhline(0, color='red', linestyle='--', linewidth=2, label='目标库存')
ax1.fill_between(range(len(inventory_history)), 0, inventory_history,
                 where=np.array(inventory_history) > 0, alpha=0.3, color='green', label='多头头寸')
ax1.fill_between(range(len(inventory_history)), 0, inventory_history,
                 where=np.array(inventory_history) < 0, alpha=0.3, color='red', label='空头头寸')
ax1.set_xlabel('时间步')
ax1.set_ylabel('库存量')
ax1.set_title('做市商库存变化')
ax1.legend()
ax1.grid(True, alpha=0.3)
 
# 报价调整
ax2 = axes[1]
ax2.plot(quote_adjustments, linewidth=2, label='报价调整')
ax2.axhline(0, color='black', linestyle='--', linewidth=1)
ax2.fill_between(range(len(quote_adjustments)), 0, quote_adjustments,
                 where=np.array(quote_adjustments) > 0, alpha=0.3, color='green')
ax2.fill_between(range(len(quote_adjustments)), 0, quote_adjustments,
                 where=np.array(quote_adjustments) < 0, alpha=0.3, color='red')
ax2.set_xlabel('时间步')
ax2.set_ylabel('报价调整')
ax2.set_title('库存驱动的报价调整')
ax2.grid(True, alpha=0.3)
 
plt.tight_layout()
plt.show()

九、流动性度量综合

def comprehensive_liquidity_analysis(prices, volumes, bid_prices=None, ask_prices=None):
    """
    综合流动性分析
 
    参数:
        prices: 价格序列
        volumes: 成交量序列
        bid_prices: (可选) 买价序列
        ask_prices: (可选) 卖价序列
 
    返回:
        流动性指标字典
    """
    returns = prices.pct_change().dropna()
 
    metrics = {}
 
    # 1. Amihud 非流动性
    metrics['amihud'] = calculate_amihud_illiquidity(returns, volumes)
 
    # 2. Roll 价差
    if bid_prices is not None and ask_prices is not None:
        metrics['absolute_spread'] = (ask_prices - bid_prices).mean()
        metrics['relative_spread'] = ((ask_prices - bid_prices) / prices).mean()
    else:
        roll_spread = roll_spread_estimator(returns)
        metrics['roll_spread'] = roll_spread
 
    # 3. 买卖价差变异系数
    if bid_prices is not None and ask_prices is not None:
        spreads = ask_prices - bid_prices
        metrics['spread_cv'] = spreads.std() / spreads.mean()
 
    # 4. 成交量变异系数
    metrics['volume_cv'] = volumes.std() / volumes.mean()
 
    # 5. 价格影响回归
    price_impact = estimate_kyle_lambda(prices, volumes, pd.Series([1] * len(volumes)))
    metrics['kyle_lambda'] = price_impact['lambda']
 
    # 6. 流动性综合得分(归一化后平均)
    # 注意:这里简化处理,实际应用需要更复杂的权重设计
    liquidity_score = 1 / (
        1 + metrics['amihud'] +
        (metrics.get('relative_spread', 0) * 100) +
        (metrics.get('kyle_lambda', 0) * 10000)
    )
    metrics['liquidity_score'] = liquidity_score
 
    return metrics
 
# 综合分析示例
np.random.seed(42)
n_days = 500
 
prices = pd.Series(100 * np.exp(np.cumsum(np.random.normal(0.0005, 0.02, n_days))))
volumes = pd.Series(np.random.lognormal(10, 0.5, n_days))
bid_prices = prices * (1 - np.random.uniform(0.001, 0.005, n_days))
ask_prices = prices * (1 + np.random.uniform(0.001, 0.005, n_days))
 
liquidity_metrics = comprehensive_liquidity_analysis(prices, volumes, bid_prices, ask_prices)
 
print("\n【综合流动性分析】")
print("=" * 60)
print(f"{'指标':<25} {'值':<20}")
print("-" * 60)
for metric, value in liquidity_metrics.items():
    if 'score' in metric:
        print(f"{metric:<25} {value:<20.4f}")
    elif 'amihud' in metric or 'lambda' in metric:
        print(f"{metric:<25} {value:<20.6f}")
    else:
        print(f"{metric:<25} {value:<20.4%}" if value < 1 else f"{metric:<25} {value:<20.4f}")
print("=" * 60)

十、高频数据特性

10.1 数据特征

特性说明处理方法
数据量大纳秒级时间戳,每日百万条记录分布式计算、采样
噪声微结构噪声(买卖价差跳跃等)已实现波动率、双幂次变差
非同步交易不同股票最后成交时间不同同步到共同网格
日内模式U 型成交量、开盘收盘波动大去季节性调整

10.2 高频特征构造

def construct_high_frequency_features(tick_data):
    """
    构造高频交易特征
 
    参数:
        tick_data: 包含时间戳、价格、成交量的 DataFrame
 
    返回:
        特征 DataFrame
    """
    features = pd.DataFrame(index=tick_data.index)
 
    # 1. 时间特征
    features['hour'] = tick_data.index.hour
    features['minute'] = tick_data.index.minute
    features['time_to_close'] = (
        tick_data.index - tick_data.index.normalize()
    ).dt.total_seconds() / 3600
 
    # 2. 价格动量特征(多时间尺度)
    for window in [5, 10, 30, 60]:
        features[f'return_{window}min'] = (
            tick_data['price'].pct_change(window)
        )
 
    # 3. 波动率特征
    for window in [5, 10, 30]:
        features[f'volatility_{window}min'] = (
            tick_data['price'].pct_change().rolling(window).std()
        )
 
    # 4. 成交量特征
    features['volume_ma_5'] = tick_data['volume'].rolling(5).mean()
    features['volume_ratio'] = (
        tick_data['volume'] / features['volume_ma_5']
    )
 
    # 5. 订单流不平衡(如果有买卖方向数据)
    if 'direction' in tick_data.columns:
        features['ofi'] = (
            tick_data['direction'] * tick_data['volume']
        ).rolling(10).sum()
 
    return features

十一、核心知识点总结

概念核心内容Python 实现
订单簿价格优先、时间优先、流动性深度OrderBook 类模拟
买卖价差订单处理成本、库存风险、信息不对称Roll 估计、CORWIN-SCHULTZ
价格冲击Kyle 模型、Amihud 指标线性回归估计
执行算法VWAP 跟随成交量、TWAP 均匀分配按比例/均匀拆单
实现波动率高频数据计算实际波动率收益率平方和
OFI订单流不平衡预测价格买卖压力度量
微价格考虑深度的加权中间价深度加权公式

关键公式:

买卖价差 = Ask - Bid
相对价差 = (Ask - Bid) / Mid Price
Kyle 模型: Δp = λ × Volume + ε
Amihud: ILLIQ = |R| / Volume
VWAP: Σ(P × V) / Σ(V)
OFI: 买方变化 + 卖方变化

实践要点:

  1. 交易成本是策略盈利的隐形杀手,必须量化评估
  2. 流动性差的资产需要更大的价差补偿
  3. 大单拆分是降低冲击的关键技术
  4. 高频数据需要特殊的处理方法

下一步: 继续阅读 03-衍生品与波动率.md,了解期权定价和波动率交易。