02-高频数据分析

预计学习时间:2.5 小时

难度:⭐⭐⭐

核心问题:Tick 数据长什么样?如何从毫秒级数据中提取有价值的信息?


从一个直觉出发

你打开某只股票的日线数据,看到”今天收盘价 10.02 元,成交量 50 万手”。信息很简洁。

但如果你打开 Tick 数据,你会看到类似这样的东西:

09:30:00.001  10.01  买入  100 股
09:30:00.001  10.02  买入  500 股
09:30:00.003  10.02  卖出  200 股
09:30:00.003  10.03  买入  100 股
09:30:00.005  10.01  卖出  300 股   ← 价格回去了?
09:30:00.005  10.01  买入  800 股   ← 同一毫秒内既有买又有卖?
...

Tick 数据是乱的、脏的、庞大的。 一只股票一天的 tick 数据可能有几十万到几百万条。直接拿来分析,你会得到错误的结果。

这一章就是教你如何处理和分析这种数据。


一、Tick 数据清洗

1.1 为什么需要清洗

Tick 数据的常见问题:

问题原因后果
价格异常跳变数据传输错误、系统异常虚假波动率
成交量为 0系统记录异常零收益率
重复记录多源数据合并人为高频
时间戳不准确挂钟时间 vs 事件时间乱序数据
成交价超出买卖价差数据合并时序错误不合理的成交

1.2 Python 清洗代码

import numpy as np
import pandas as pd
 
def clean_tick_data(df):
    """Tick 数据清洗函数
    df: DataFrame,包含列 [timestamp, price, volume, direction]
    """
    df = df.copy()
 
    # 1. 去除成交量为 0 的记录
    df = df[df['volume'] > 0]
 
    # 2. 去除价格为负或异常的记录
    price_median = df['price'].median()
    df = df[(df['price'] > 0) & (df['price'] < price_median * 2)]
 
    # 3. 价格跳变过滤:如果价格变动超过 5 倍标准差,视为异常
    returns = df['price'].pct_change().abs()
    threshold = returns.rolling(100).std().fillna(returns.std()) * 5
    mask = (returns < threshold) | (returns.isna())
    df = df[mask]
 
    # 4. 去除同一时间戳内的重复记录(保留最后一条)
    df = df.drop_duplicates(subset=['timestamp'], keep='last')
 
    # 5. 确保时间排序
    df = df.sort_values('timestamp').reset_index(drop=True)
 
    # 6. 时间戳去重(防止微秒级重复)
    # 同一毫秒内的记录,如果价格相同则合并
    df['time_ms'] = df['timestamp'].dt.floor('ms')
    df = df.groupby('time_ms').agg({
        'price': 'last',
        'volume': 'sum',
        'direction': 'last'
    }).reset_index()
 
    df = df.rename(columns={'time_ms': 'timestamp'})
    return df
 
# 模拟脏数据
np.random.seed(42)
n = 10000
timestamps = pd.date_range('2025-01-02 09:30:00', periods=n, freq='ms')
prices = 10.0 + np.cumsum(np.random.normal(0, 0.001, n))
volumes = np.random.exponential(200, n).astype(int)
 
# 注入异常
prices[100] = 100.0      # 价格异常跳变
volumes[200] = 0         # 零成交量
prices[300] = prices[299] # 重复时间戳同一价格
 
df = pd.DataFrame({
    'timestamp': timestamps,
    'price': prices,
    'volume': volumes,
    'direction': np.random.choice([-1, 1], n)
})
 
print(f"清洗前: {len(df)} 条")
df_clean = clean_tick_data(df)
print(f"清洗后: {len(df_clean)} 条")

1.3 事件时间 vs 挂钟时间

挂钟时间(Clock Time):
  09:30:00.001 → 09:30:00.002 → 09:30:00.003 → ...
  每个事件之间的间隔是"物理时间"

事件时间(Event Time):
  第 1 笔交易 → 第 2 笔交易 → 第 3 笔交易 → ...
  每个事件之间的间隔是"交易次数"

关键区别:
  - 午休时挂钟时间流逝但没有事件
  - 高频分析应该用事件时间(隔 N 笔交易算一个指标)
  - 日内模式分析应该用挂钟时间

二、日内模式

2.1 U 形曲线

股票市场的交易活跃度在一天内呈 U 形分布:

成交量
  │
  │  ██
  │  ██ ██
  │  ██ ██
  │  ██ ██  ██
  │  ██ ██  ██ ██
  │  ██ ██  ██ ██ ██
  └────────────────────── 时间
    开盘  午休  收盘
    活跃  冷清  活跃

原因

  • 开盘活跃:隔夜信息集中释放,投资者集中下单
  • 午间冷清:信息少,市场参与者减少
  • 收盘活跃:基金调仓、尾盘抢筹/出货

2.2 Python 可视化日内模式

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
 
np.random.seed(42)
 
# ============================================================
# 模拟日内 U 形成交量模式
# ============================================================
n_minutes = 240  # 4 小时 = 240 分钟
minutes = np.arange(n_minutes)
 
# U 形函数
def u_shape(x, n, a=2.0):
    """U 形曲线"""
    return 1 + a * ((x / n - 0.5) ** 2)
 
base_volume = u_shape(minutes, n_minutes, a=8)
base_volume /= base_volume.mean()  # 归一化到均值 1
 
# 生成模拟成交量
volume_pattern = base_volume * np.random.exponential(1.0, n_minutes)
 
# 已实现波动率的日内模式
# 波动率也在开盘和收盘时更高
vol_pattern = np.sqrt(base_volume) * np.random.exponential(0.02, n_minutes)
 
# 可视化
fig, axes = plt.subplots(2, 1, figsize=(14, 8))
 
# 成交量模式
axes[0].bar(minutes, volume_pattern, width=1, alpha=0.7)
axes[0].plot(minutes, base_volume, 'r-', linewidth=2, label='理论 U 形')
axes[0].axvline(x=30, color='gray', linestyle='--', alpha=0.5, label='上午休市')
axes[0].axvline(x=120, color='gray', linestyle='--', alpha=0.5, label='午间休市')
axes[0].axvline(x=150, color='gray', linestyle='--', alpha=0.5, label='下午开盘')
axes[0].set_xlabel('交易分钟')
axes[0].set_ylabel('成交量')
axes[0].set_title('日内成交量 U 形模式')
axes[0].legend()
 
# 波动率模式
axes[1].bar(minutes, vol_pattern, width=1, alpha=0.7, color='orange')
axes[1].set_xlabel('交易分钟')
axes[1].set_ylabel('已实现波动率')
axes[1].set_title('日内波动率模式')
 
plt.tight_layout()
plt.show()

三、已实现波动率(Realized Volatility)

3.1 核心思想

传统方法用日收盘价估计波动率,但一天之内价格可能上下波动很多次。高频数据可以更精确地估计波动率

已实现波动率的核心公式(二次变差):

其中 是高频收益率(如每秒收益率)。

直觉:把一天内所有小幅波动的平方加起来。波动越大,RV 越大。

3.2 为什么高频数据更精确

日频估计:只能看到"今天收盘 - 昨天收盘"
  → 一天内的波动全被忽略了
  → 低估真实波动率

分钟频估计:一天 240 个数据点
  → 捕捉日内波动
  → 但仍有遗漏

秒频估计:一天 14400 个数据点
  → 非常精确
  → 但噪声变大

3.3 Python 实现

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
 
np.random.seed(42)
 
# ============================================================
# 模拟高频数据并计算已实现波动率
# ============================================================
n_days = 60
n_ticks_per_day = 390  # 每天约 390 分钟
 
# 每天的真实波动率
daily_vol = 0.015 + 0.01 * np.sin(np.linspace(0, 4 * np.pi, n_days))
 
daily_rv = []  # 已实现波动率
daily_close_vol = []  # 收盘价估计的波动率
 
price = 100.0
 
for d in range(n_days):
    sigma = daily_vol[d]
    # 高频收益率
    hf_returns = np.random.normal(0, sigma / np.sqrt(n_ticks_per_day), n_ticks_per_day)
 
    # 已实现波动率
    rv = np.sum(hf_returns ** 2)
    daily_rv.append(np.sqrt(rv * 252))  # 年化
 
    # 收盘价估计
    close_return = np.sum(hf_returns)
    daily_close_vol.append(abs(close_return) * np.sqrt(252) / np.sqrt(252 / n_days_per_day))
 
# 平滑真实波动率(用于对比)
true_annual_vol = daily_vol * np.sqrt(252)
 
# 可视化
fig, ax = plt.subplots(figsize=(14, 6))
 
ax.plot(true_annual_vol, label='真实年化波动率', linewidth=2)
ax.plot(daily_rv, label='已实现波动率估计', alpha=0.7)
ax.plot(daily_close_vol, label='收盘价波动率估计', alpha=0.5, linestyle='--')
 
ax.set_xlabel('交易日')
ax.set_ylabel('年化波动率')
ax.set_title('已实现波动率 vs 收盘价波动率')
ax.legend()
 
plt.tight_layout()
plt.show()
 
print(f"真实波动率均值: {np.mean(true_annual_vol):.4f}")
print(f"已实现波动率均值: {np.mean(daily_rv):.4f}")
print(f"收盘价波动率均值: {np.mean(daily_close_vol):.4f}")

3.4 已实现协方差矩阵

当你需要估计多个资产之间的相关性和波动率时,已实现协方差矩阵非常有用:

def realized_covariance(returns_matrix):
    """计算已实现协方差矩阵
    returns_matrix: (n_ticks, n_assets) 高频收益率矩阵
    """
    return returns_matrix.T @ returns_matrix
 
def realized_correlation(rcov_matrix):
    """从已实现协方差矩阵计算相关系数矩阵"""
    n = rcov_matrix.shape[0]
    d = np.sqrt(np.diag(rcov_matrix))
    corr = rcov_matrix / np.outer(d, d)
    return corr
 
# 模拟两只相关资产
n_ticks = 390
rho = 0.6  # 真实相关性
sigma1 = 0.02
sigma2 = 0.025
cov_true = rho * sigma1 * sigma2
 
L = np.array([[sigma1, 0], [cov_true, np.sqrt(sigma2**2 - cov_true**2)]])
hf_returns = np.random.normal(0, 1, (n_ticks, 2)) @ L.T
 
rcov = realized_covariance(hf_returns)
rcorr = realized_correlation(rcov)
 
print(f"真实相关性: {rho:.4f}")
print(f"已实现相关性估计: {rcorr[0, 1]:.4f}")

四、微观噪声模型

4.1 噪声对波动率估计的影响

在高频数据中,我们观察到的不是”真实价格”,而是”真实价格 + 噪声”:

其中 是真实对数价格, 是微观结构噪声(买卖价差跳跃、离散化误差等)。

噪声会导致已实现波动率高估。频率越高,噪声的影响越大。

  已实现波动率
  │
  │        ┌──────────── 噪声主导区
  │       /
  │      /
  │     / ──────────── 最优采样频率
  │    /
  │   /
  │  /
  │ / ──────────────── 信息主导区(低频)
  └──────────────────────── 采样频率
  低频                  高频

4.2 Zhang 两尺度模型

Zhang, Mykland, Ait-Sahalia (2005) 提出了一种处理噪声的方法——两尺度已实现波动率(TSRV):

def tsrv(prices, slow_freq=10, fast_freq=1):
    """两尺度已实现波动率(TSRV)
    prices: 对数价格序列
    slow_freq: 慢采样频率(每 N 笔取一个)
    fast_freq: 快采样频率(每笔)
    """
    n = len(prices)
 
    # 快尺度:用所有数据
    fast_returns = np.diff(prices)
    rv_fast = np.sum(fast_returns ** 2)
 
    # 慢尺度:每 slow_freq 笔取一个
    slow_prices = prices[::slow_freq]
    slow_returns = np.diff(slow_prices)
    rv_slow = np.sum(slow_returns ** 2)
 
    # 降噪 TSRV
    n_slow = len(slow_prices)
    n_fast = len(prices)
    c_n = (n_fast * (n_fast - slow_freq + 1) * (n_fast - slow_freq + 2)) / \
          (slow_freq ** 2 * n_slow * (n_slow - 1) * (n_slow - 2) + 1e-10)
 
    tsrv = (rv_fast - c_n * rv_slow / slow_freq)
    tsrv = max(tsrv, 0)  # 确保非负
 
    return tsrv
 
# 模拟带噪声的高频价格
np.random.seed(42)
n = 5000
sigma = 0.02
noise_sigma = 0.0005
 
# 真实价格
true_prices = np.cumsum(np.random.normal(0, sigma / np.sqrt(n), n))
# 观察到的价格 = 真实价格 + 噪声
observed_prices = true_prices + np.random.normal(0, noise_sigma, n)
 
# 计算 RV 和 TSRV
rv_naive = np.sum(np.diff(observed_prices) ** 2)
rv_tsrv = tsrv(observed_prices, slow_freq=5)
 
print(f"朴素 RV(高估): {rv_naive:.8f}")
print(f"TSRV(修正后): {rv_tsrv:.8f}")
print(f"真实 RV: {sigma**2:.8f}")

五、逐笔成交数据特征分析

5.1 成交量分布

金融市场的成交量分布不服从正态分布,而是厚尾分布——大部分交易是小单,但偶尔出现极大单。

import numpy as np
import matplotlib.pyplot as plt
 
np.random.seed(42)
 
# 模拟成交量数据(对数正态分布,更接近真实情况)
n = 100000
volumes = np.random.lognormal(mean=4, sigma=1.5, size=n)
 
# 大单阈值:95 分位数
threshold_95 = np.percentile(volumes, 95)
large_orders = volumes[volumes > threshold_95]
small_orders = volumes[volumes <= threshold_95]
 
print(f"总成交笔数: {n}")
print(f"大单笔数(>95分位): {len(large_orders)} ({len(large_orders)/n*100:.1f}%)")
print(f"大单占总成交量: {large_orders.sum()/volumes.sum()*100:.1f}%")
 
# 可视化
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
 
# 成交量直方图
axes[0].hist(volumes, bins=200, edgecolor='black', alpha=0.7)
axes[0].axvline(threshold_95, color='red', linestyle='--', label=f'95分位={threshold_95:.0f}')
axes[0].set_xlabel('成交量')
axes[0].set_ylabel('频次')
axes[0].set_title('成交量分布(厚尾)')
axes[0].legend()
 
# 对数成交量直方图
axes[1].hist(np.log(volumes), bins=100, edgecolor='black', alpha=0.7, color='green')
axes[1].set_xlabel('log(成交量)')
axes[1].set_ylabel('频次')
axes[1].set_title('对数成交量分布(近似正态)')
 
plt.tight_layout()
plt.show()

5.2 大单 vs 小单的统计特征

# 模拟大单和小单对价格的不同影响
np.random.seed(42)
n_trades = 10000
base_price = 100.0
price = base_price
prices = [price]
 
for i in range(n_trades):
    vol = np.random.lognormal(4, 1.5)
 
    # 大单对价格的影响更大
    if vol > threshold_95:
        impact = 0.001 * (vol / threshold_95)  # 大单有额外冲击
    else:
        impact = 0.0001  # 小单冲击很小
 
    # 随机方向
    direction = np.random.choice([-1, 1])
    price += direction * impact
    prices.append(price)
 
# 计算不同大小订单后的价格变动
vol_order = np.random.lognormal(4, 1.5, n_trades)
returns_after = np.diff(prices)
is_large = vol_order > threshold_95
 
print(f"小单后平均价格变动: {np.mean(returns_after[~is_large]):.6f}")
print(f"大单后平均价格变动: {np.mean(returns_after[is_large]):.6f}")
print(f"大单价格变动是小单的 {abs(np.mean(returns_after[is_large]) / (np.mean(returns_after[~is_large]) + 1e-10)):.1f} 倍")

六、订单簿重建与 Level-2 分析

6.1 从逐笔数据重建订单簿快照

真实交易中,交易所发送的是”订单簿变更事件”(增量数据)。要得到某个时刻的完整订单簿快照,需要不断应用变更。

import numpy as np
import pandas as pd
from collections import defaultdict
 
class SimpleOrderBook:
    """简化的订单簿重建器"""
 
    def __init__(self):
        self.bids = defaultdict(float)  # {price: volume}
        self.asks = defaultdict(float)
 
    def apply_snapshot(self, bid_levels, ask_levels):
        """应用全量快照"""
        self.bids.clear()
        self.asks.clear()
        for price, vol in bid_levels:
            if vol > 0:
                self.bids[price] = vol
        for price, vol in ask_levels:
            if vol > 0:
                self.asks[price] = vol
 
    def apply_update(self, side, price, volume):
        """应用增量更新"""
        if side == 'bid':
            if volume == 0:
                self.bids.pop(price, None)
            else:
                self.bids[price] = volume
        else:
            if volume == 0:
                self.asks.pop(price, None)
            else:
                self.asks[price] = volume
 
    def get_top_n(self, n=5):
        """获取前 N 档"""
        sorted_bids = sorted(self.bids.items(), reverse=True)[:n]
        sorted_asks = sorted(self.asks.items())[:n]
        return sorted_bids, sorted_asks
 
    def mid_price(self):
        best_bid = max(self.bids.keys()) if self.bids else 0
        best_ask = min(self.asks.keys()) if self.asks else 0
        if best_bid > 0 and best_ask > 0:
            return (best_bid + best_ask) / 2
        return None
 
    def order_book_imbalance(self, levels=5):
        """订单簿不平衡指标
        正值 = 买盘强,负值 = 卖盘强
        """
        total_bid = sum(vol for _, vol in sorted(self.bids.items(), reverse=True)[:levels])
        total_ask = sum(vol for _, vol in sorted(self.asks.items())[:levels])
        total = total_bid + total_ask
        if total == 0:
            return 0
        return (total_bid - total_ask) / total
 
# 示例使用
ob = SimpleOrderBook()
ob.apply_snapshot(
    bid_levels=[(10.00, 1200), (9.99, 2000), (9.98, 3500)],
    ask_levels=[(10.02, 800), (10.03, 1200), (10.04, 500)]
)
 
# 模拟增量更新
ob.apply_update('ask', 10.02, 0)      # 卖一被吃掉
ob.apply_update('bid', 10.00, 500)    # 买一被部分成交
ob.apply_update('ask', 10.05, 600)    # 新挂卖单
 
bids, asks = ob.get_top_n(5)
print("买盘:")
for p, v in bids:
    print(f"  {p:.2f}: {v}")
 
print("卖盘:")
for p, v in asks:
    print(f"  {p:.2f}: {v}")
 
print(f"中间价: {ob.mid_price():.2f}")
print(f"订单簿不平衡: {ob.order_book_imbalance():.4f}")

6.2 订单簿不平衡指标

订单簿不平衡(Order Book Imbalance, OBI)是一个简单但有效的短期价格预测指标:

  • OBI > 0:买盘强于卖盘,短期价格倾向上涨
  • OBI < 0:卖盘强于买盘,短期价格倾向下跌
  • OBI 接近 0:买卖平衡
# 模拟 OBI 的预测能力
np.random.seed(42)
n_snapshots = 1000
 
obi_values = []
future_returns = []
 
ob = SimpleOrderBook()
price = 100.0
 
for i in range(n_snapshots):
    # 模拟随机订单簿变化
    ob.apply_snapshot(
        bid_levels=[
            (price - 0.02 * j, np.random.randint(500, 3000))
            for j in range(1, 6)
        ],
        ask_levels=[
            (price + 0.02 * j, np.random.randint(500, 3000))
            for j in range(1, 6)
        ]
    )
 
    obi = ob.order_book_imbalance(levels=5)
    obi_values.append(obi)
 
    # 模拟短期价格变动(OBI 越大,上涨概率越大)
    ret = 0.0001 * obi + np.random.normal(0, 0.001)
    future_returns.append(ret)
 
    price *= (1 + ret)
 
# 分析 OBI 的预测能力
obi_values = np.array(obi_values)
future_returns = np.array(future_returns)
 
# 分组分析
obi_high = future_returns[obi_values > 0.2]
obi_low = future_returns[obi_values < -0.2]
obi_mid = future_returns[(obi_values >= -0.2) & (obi_values <= 0.2)]
 
print(f"OBI > 0.2 时平均收益: {np.mean(obi_high):.6f}")
print(f"OBI < -0.2 时平均收益: {np.mean(obi_low):.6f}")
print(f"|OBI| <= 0.2 时平均收益: {np.mean(obi_mid):.6f}")

七、本文件核心要点

主题关键方法核心洞察
Tick 清洗过滤异常、去重、排序脏数据导致虚假结论
U 形曲线成交量/波动率的日内分布开盘收盘活跃,午间冷清
已实现波动率RV = sum(r^2)高频数据更精确地估计波动率
微观噪声TSRV 两尺度模型采样频率越高,噪声影响越大
成交量分布对数正态/厚尾分布少量大单贡献了大量成交
订单簿分析OBI 不平衡指标买卖力量对比可预测短期价格

一句话总结:高频数据分析的核心挑战是”在噪声中提取信号”——频率越高信息越多,但噪声也越大。