02-高频数据分析
预计学习时间:2.5 小时
难度:⭐⭐⭐
核心问题:Tick 数据长什么样?如何从毫秒级数据中提取有价值的信息?
从一个直觉出发
你打开某只股票的日线数据,看到”今天收盘价 10.02 元,成交量 50 万手”。信息很简洁。
但如果你打开 Tick 数据,你会看到类似这样的东西:
09:30:00.001 10.01 买入 100 股
09:30:00.001 10.02 买入 500 股
09:30:00.003 10.02 卖出 200 股
09:30:00.003 10.03 买入 100 股
09:30:00.005 10.01 卖出 300 股 ← 价格回去了?
09:30:00.005 10.01 买入 800 股 ← 同一毫秒内既有买又有卖?
...
Tick 数据是乱的、脏的、庞大的。 一只股票一天的 tick 数据可能有几十万到几百万条。直接拿来分析,你会得到错误的结果。
这一章就是教你如何处理和分析这种数据。
一、Tick 数据清洗
1.1 为什么需要清洗
Tick 数据的常见问题:
| 问题 | 原因 | 后果 |
|---|---|---|
| 价格异常跳变 | 数据传输错误、系统异常 | 虚假波动率 |
| 成交量为 0 | 系统记录异常 | 零收益率 |
| 重复记录 | 多源数据合并 | 人为高频 |
| 时间戳不准确 | 挂钟时间 vs 事件时间 | 乱序数据 |
| 成交价超出买卖价差 | 数据合并时序错误 | 不合理的成交 |
1.2 Python 清洗代码
import numpy as np
import pandas as pd
def clean_tick_data(df):
"""Tick 数据清洗函数
df: DataFrame,包含列 [timestamp, price, volume, direction]
"""
df = df.copy()
# 1. 去除成交量为 0 的记录
df = df[df['volume'] > 0]
# 2. 去除价格为负或异常的记录
price_median = df['price'].median()
df = df[(df['price'] > 0) & (df['price'] < price_median * 2)]
# 3. 价格跳变过滤:如果价格变动超过 5 倍标准差,视为异常
returns = df['price'].pct_change().abs()
threshold = returns.rolling(100).std().fillna(returns.std()) * 5
mask = (returns < threshold) | (returns.isna())
df = df[mask]
# 4. 去除同一时间戳内的重复记录(保留最后一条)
df = df.drop_duplicates(subset=['timestamp'], keep='last')
# 5. 确保时间排序
df = df.sort_values('timestamp').reset_index(drop=True)
# 6. 时间戳去重(防止微秒级重复)
# 同一毫秒内的记录,如果价格相同则合并
df['time_ms'] = df['timestamp'].dt.floor('ms')
df = df.groupby('time_ms').agg({
'price': 'last',
'volume': 'sum',
'direction': 'last'
}).reset_index()
df = df.rename(columns={'time_ms': 'timestamp'})
return df
# 模拟脏数据
np.random.seed(42)
n = 10000
timestamps = pd.date_range('2025-01-02 09:30:00', periods=n, freq='ms')
prices = 10.0 + np.cumsum(np.random.normal(0, 0.001, n))
volumes = np.random.exponential(200, n).astype(int)
# 注入异常
prices[100] = 100.0 # 价格异常跳变
volumes[200] = 0 # 零成交量
prices[300] = prices[299] # 重复时间戳同一价格
df = pd.DataFrame({
'timestamp': timestamps,
'price': prices,
'volume': volumes,
'direction': np.random.choice([-1, 1], n)
})
print(f"清洗前: {len(df)} 条")
df_clean = clean_tick_data(df)
print(f"清洗后: {len(df_clean)} 条")1.3 事件时间 vs 挂钟时间
挂钟时间(Clock Time):
09:30:00.001 → 09:30:00.002 → 09:30:00.003 → ...
每个事件之间的间隔是"物理时间"
事件时间(Event Time):
第 1 笔交易 → 第 2 笔交易 → 第 3 笔交易 → ...
每个事件之间的间隔是"交易次数"
关键区别:
- 午休时挂钟时间流逝但没有事件
- 高频分析应该用事件时间(隔 N 笔交易算一个指标)
- 日内模式分析应该用挂钟时间
二、日内模式
2.1 U 形曲线
股票市场的交易活跃度在一天内呈 U 形分布:
成交量
│
│ ██
│ ██ ██
│ ██ ██
│ ██ ██ ██
│ ██ ██ ██ ██
│ ██ ██ ██ ██ ██
└────────────────────── 时间
开盘 午休 收盘
活跃 冷清 活跃
原因:
- 开盘活跃:隔夜信息集中释放,投资者集中下单
- 午间冷清:信息少,市场参与者减少
- 收盘活跃:基金调仓、尾盘抢筹/出货
2.2 Python 可视化日内模式
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
np.random.seed(42)
# ============================================================
# 模拟日内 U 形成交量模式
# ============================================================
n_minutes = 240 # 4 小时 = 240 分钟
minutes = np.arange(n_minutes)
# U 形函数
def u_shape(x, n, a=2.0):
"""U 形曲线"""
return 1 + a * ((x / n - 0.5) ** 2)
base_volume = u_shape(minutes, n_minutes, a=8)
base_volume /= base_volume.mean() # 归一化到均值 1
# 生成模拟成交量
volume_pattern = base_volume * np.random.exponential(1.0, n_minutes)
# 已实现波动率的日内模式
# 波动率也在开盘和收盘时更高
vol_pattern = np.sqrt(base_volume) * np.random.exponential(0.02, n_minutes)
# 可视化
fig, axes = plt.subplots(2, 1, figsize=(14, 8))
# 成交量模式
axes[0].bar(minutes, volume_pattern, width=1, alpha=0.7)
axes[0].plot(minutes, base_volume, 'r-', linewidth=2, label='理论 U 形')
axes[0].axvline(x=30, color='gray', linestyle='--', alpha=0.5, label='上午休市')
axes[0].axvline(x=120, color='gray', linestyle='--', alpha=0.5, label='午间休市')
axes[0].axvline(x=150, color='gray', linestyle='--', alpha=0.5, label='下午开盘')
axes[0].set_xlabel('交易分钟')
axes[0].set_ylabel('成交量')
axes[0].set_title('日内成交量 U 形模式')
axes[0].legend()
# 波动率模式
axes[1].bar(minutes, vol_pattern, width=1, alpha=0.7, color='orange')
axes[1].set_xlabel('交易分钟')
axes[1].set_ylabel('已实现波动率')
axes[1].set_title('日内波动率模式')
plt.tight_layout()
plt.show()三、已实现波动率(Realized Volatility)
3.1 核心思想
传统方法用日收盘价估计波动率,但一天之内价格可能上下波动很多次。高频数据可以更精确地估计波动率。
已实现波动率的核心公式(二次变差):
其中 是高频收益率(如每秒收益率)。
直觉:把一天内所有小幅波动的平方加起来。波动越大,RV 越大。
3.2 为什么高频数据更精确
日频估计:只能看到"今天收盘 - 昨天收盘"
→ 一天内的波动全被忽略了
→ 低估真实波动率
分钟频估计:一天 240 个数据点
→ 捕捉日内波动
→ 但仍有遗漏
秒频估计:一天 14400 个数据点
→ 非常精确
→ 但噪声变大
3.3 Python 实现
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
np.random.seed(42)
# ============================================================
# 模拟高频数据并计算已实现波动率
# ============================================================
n_days = 60
n_ticks_per_day = 390 # 每天约 390 分钟
# 每天的真实波动率
daily_vol = 0.015 + 0.01 * np.sin(np.linspace(0, 4 * np.pi, n_days))
daily_rv = [] # 已实现波动率
daily_close_vol = [] # 收盘价估计的波动率
price = 100.0
for d in range(n_days):
sigma = daily_vol[d]
# 高频收益率
hf_returns = np.random.normal(0, sigma / np.sqrt(n_ticks_per_day), n_ticks_per_day)
# 已实现波动率
rv = np.sum(hf_returns ** 2)
daily_rv.append(np.sqrt(rv * 252)) # 年化
# 收盘价估计
close_return = np.sum(hf_returns)
daily_close_vol.append(abs(close_return) * np.sqrt(252) / np.sqrt(252 / n_days_per_day))
# 平滑真实波动率(用于对比)
true_annual_vol = daily_vol * np.sqrt(252)
# 可视化
fig, ax = plt.subplots(figsize=(14, 6))
ax.plot(true_annual_vol, label='真实年化波动率', linewidth=2)
ax.plot(daily_rv, label='已实现波动率估计', alpha=0.7)
ax.plot(daily_close_vol, label='收盘价波动率估计', alpha=0.5, linestyle='--')
ax.set_xlabel('交易日')
ax.set_ylabel('年化波动率')
ax.set_title('已实现波动率 vs 收盘价波动率')
ax.legend()
plt.tight_layout()
plt.show()
print(f"真实波动率均值: {np.mean(true_annual_vol):.4f}")
print(f"已实现波动率均值: {np.mean(daily_rv):.4f}")
print(f"收盘价波动率均值: {np.mean(daily_close_vol):.4f}")3.4 已实现协方差矩阵
当你需要估计多个资产之间的相关性和波动率时,已实现协方差矩阵非常有用:
def realized_covariance(returns_matrix):
"""计算已实现协方差矩阵
returns_matrix: (n_ticks, n_assets) 高频收益率矩阵
"""
return returns_matrix.T @ returns_matrix
def realized_correlation(rcov_matrix):
"""从已实现协方差矩阵计算相关系数矩阵"""
n = rcov_matrix.shape[0]
d = np.sqrt(np.diag(rcov_matrix))
corr = rcov_matrix / np.outer(d, d)
return corr
# 模拟两只相关资产
n_ticks = 390
rho = 0.6 # 真实相关性
sigma1 = 0.02
sigma2 = 0.025
cov_true = rho * sigma1 * sigma2
L = np.array([[sigma1, 0], [cov_true, np.sqrt(sigma2**2 - cov_true**2)]])
hf_returns = np.random.normal(0, 1, (n_ticks, 2)) @ L.T
rcov = realized_covariance(hf_returns)
rcorr = realized_correlation(rcov)
print(f"真实相关性: {rho:.4f}")
print(f"已实现相关性估计: {rcorr[0, 1]:.4f}")四、微观噪声模型
4.1 噪声对波动率估计的影响
在高频数据中,我们观察到的不是”真实价格”,而是”真实价格 + 噪声”:
其中 是真实对数价格, 是微观结构噪声(买卖价差跳跃、离散化误差等)。
噪声会导致已实现波动率高估。频率越高,噪声的影响越大。
已实现波动率
│
│ ┌──────────── 噪声主导区
│ /
│ /
│ / ──────────── 最优采样频率
│ /
│ /
│ /
│ / ──────────────── 信息主导区(低频)
└──────────────────────── 采样频率
低频 高频
4.2 Zhang 两尺度模型
Zhang, Mykland, Ait-Sahalia (2005) 提出了一种处理噪声的方法——两尺度已实现波动率(TSRV):
def tsrv(prices, slow_freq=10, fast_freq=1):
"""两尺度已实现波动率(TSRV)
prices: 对数价格序列
slow_freq: 慢采样频率(每 N 笔取一个)
fast_freq: 快采样频率(每笔)
"""
n = len(prices)
# 快尺度:用所有数据
fast_returns = np.diff(prices)
rv_fast = np.sum(fast_returns ** 2)
# 慢尺度:每 slow_freq 笔取一个
slow_prices = prices[::slow_freq]
slow_returns = np.diff(slow_prices)
rv_slow = np.sum(slow_returns ** 2)
# 降噪 TSRV
n_slow = len(slow_prices)
n_fast = len(prices)
c_n = (n_fast * (n_fast - slow_freq + 1) * (n_fast - slow_freq + 2)) / \
(slow_freq ** 2 * n_slow * (n_slow - 1) * (n_slow - 2) + 1e-10)
tsrv = (rv_fast - c_n * rv_slow / slow_freq)
tsrv = max(tsrv, 0) # 确保非负
return tsrv
# 模拟带噪声的高频价格
np.random.seed(42)
n = 5000
sigma = 0.02
noise_sigma = 0.0005
# 真实价格
true_prices = np.cumsum(np.random.normal(0, sigma / np.sqrt(n), n))
# 观察到的价格 = 真实价格 + 噪声
observed_prices = true_prices + np.random.normal(0, noise_sigma, n)
# 计算 RV 和 TSRV
rv_naive = np.sum(np.diff(observed_prices) ** 2)
rv_tsrv = tsrv(observed_prices, slow_freq=5)
print(f"朴素 RV(高估): {rv_naive:.8f}")
print(f"TSRV(修正后): {rv_tsrv:.8f}")
print(f"真实 RV: {sigma**2:.8f}")五、逐笔成交数据特征分析
5.1 成交量分布
金融市场的成交量分布不服从正态分布,而是厚尾分布——大部分交易是小单,但偶尔出现极大单。
import numpy as np
import matplotlib.pyplot as plt
np.random.seed(42)
# 模拟成交量数据(对数正态分布,更接近真实情况)
n = 100000
volumes = np.random.lognormal(mean=4, sigma=1.5, size=n)
# 大单阈值:95 分位数
threshold_95 = np.percentile(volumes, 95)
large_orders = volumes[volumes > threshold_95]
small_orders = volumes[volumes <= threshold_95]
print(f"总成交笔数: {n}")
print(f"大单笔数(>95分位): {len(large_orders)} ({len(large_orders)/n*100:.1f}%)")
print(f"大单占总成交量: {large_orders.sum()/volumes.sum()*100:.1f}%")
# 可视化
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 成交量直方图
axes[0].hist(volumes, bins=200, edgecolor='black', alpha=0.7)
axes[0].axvline(threshold_95, color='red', linestyle='--', label=f'95分位={threshold_95:.0f}')
axes[0].set_xlabel('成交量')
axes[0].set_ylabel('频次')
axes[0].set_title('成交量分布(厚尾)')
axes[0].legend()
# 对数成交量直方图
axes[1].hist(np.log(volumes), bins=100, edgecolor='black', alpha=0.7, color='green')
axes[1].set_xlabel('log(成交量)')
axes[1].set_ylabel('频次')
axes[1].set_title('对数成交量分布(近似正态)')
plt.tight_layout()
plt.show()5.2 大单 vs 小单的统计特征
# 模拟大单和小单对价格的不同影响
np.random.seed(42)
n_trades = 10000
base_price = 100.0
price = base_price
prices = [price]
for i in range(n_trades):
vol = np.random.lognormal(4, 1.5)
# 大单对价格的影响更大
if vol > threshold_95:
impact = 0.001 * (vol / threshold_95) # 大单有额外冲击
else:
impact = 0.0001 # 小单冲击很小
# 随机方向
direction = np.random.choice([-1, 1])
price += direction * impact
prices.append(price)
# 计算不同大小订单后的价格变动
vol_order = np.random.lognormal(4, 1.5, n_trades)
returns_after = np.diff(prices)
is_large = vol_order > threshold_95
print(f"小单后平均价格变动: {np.mean(returns_after[~is_large]):.6f}")
print(f"大单后平均价格变动: {np.mean(returns_after[is_large]):.6f}")
print(f"大单价格变动是小单的 {abs(np.mean(returns_after[is_large]) / (np.mean(returns_after[~is_large]) + 1e-10)):.1f} 倍")六、订单簿重建与 Level-2 分析
6.1 从逐笔数据重建订单簿快照
真实交易中,交易所发送的是”订单簿变更事件”(增量数据)。要得到某个时刻的完整订单簿快照,需要不断应用变更。
import numpy as np
import pandas as pd
from collections import defaultdict
class SimpleOrderBook:
"""简化的订单簿重建器"""
def __init__(self):
self.bids = defaultdict(float) # {price: volume}
self.asks = defaultdict(float)
def apply_snapshot(self, bid_levels, ask_levels):
"""应用全量快照"""
self.bids.clear()
self.asks.clear()
for price, vol in bid_levels:
if vol > 0:
self.bids[price] = vol
for price, vol in ask_levels:
if vol > 0:
self.asks[price] = vol
def apply_update(self, side, price, volume):
"""应用增量更新"""
if side == 'bid':
if volume == 0:
self.bids.pop(price, None)
else:
self.bids[price] = volume
else:
if volume == 0:
self.asks.pop(price, None)
else:
self.asks[price] = volume
def get_top_n(self, n=5):
"""获取前 N 档"""
sorted_bids = sorted(self.bids.items(), reverse=True)[:n]
sorted_asks = sorted(self.asks.items())[:n]
return sorted_bids, sorted_asks
def mid_price(self):
best_bid = max(self.bids.keys()) if self.bids else 0
best_ask = min(self.asks.keys()) if self.asks else 0
if best_bid > 0 and best_ask > 0:
return (best_bid + best_ask) / 2
return None
def order_book_imbalance(self, levels=5):
"""订单簿不平衡指标
正值 = 买盘强,负值 = 卖盘强
"""
total_bid = sum(vol for _, vol in sorted(self.bids.items(), reverse=True)[:levels])
total_ask = sum(vol for _, vol in sorted(self.asks.items())[:levels])
total = total_bid + total_ask
if total == 0:
return 0
return (total_bid - total_ask) / total
# 示例使用
ob = SimpleOrderBook()
ob.apply_snapshot(
bid_levels=[(10.00, 1200), (9.99, 2000), (9.98, 3500)],
ask_levels=[(10.02, 800), (10.03, 1200), (10.04, 500)]
)
# 模拟增量更新
ob.apply_update('ask', 10.02, 0) # 卖一被吃掉
ob.apply_update('bid', 10.00, 500) # 买一被部分成交
ob.apply_update('ask', 10.05, 600) # 新挂卖单
bids, asks = ob.get_top_n(5)
print("买盘:")
for p, v in bids:
print(f" {p:.2f}: {v}")
print("卖盘:")
for p, v in asks:
print(f" {p:.2f}: {v}")
print(f"中间价: {ob.mid_price():.2f}")
print(f"订单簿不平衡: {ob.order_book_imbalance():.4f}")6.2 订单簿不平衡指标
订单簿不平衡(Order Book Imbalance, OBI)是一个简单但有效的短期价格预测指标:
- OBI > 0:买盘强于卖盘,短期价格倾向上涨
- OBI < 0:卖盘强于买盘,短期价格倾向下跌
- OBI 接近 0:买卖平衡
# 模拟 OBI 的预测能力
np.random.seed(42)
n_snapshots = 1000
obi_values = []
future_returns = []
ob = SimpleOrderBook()
price = 100.0
for i in range(n_snapshots):
# 模拟随机订单簿变化
ob.apply_snapshot(
bid_levels=[
(price - 0.02 * j, np.random.randint(500, 3000))
for j in range(1, 6)
],
ask_levels=[
(price + 0.02 * j, np.random.randint(500, 3000))
for j in range(1, 6)
]
)
obi = ob.order_book_imbalance(levels=5)
obi_values.append(obi)
# 模拟短期价格变动(OBI 越大,上涨概率越大)
ret = 0.0001 * obi + np.random.normal(0, 0.001)
future_returns.append(ret)
price *= (1 + ret)
# 分析 OBI 的预测能力
obi_values = np.array(obi_values)
future_returns = np.array(future_returns)
# 分组分析
obi_high = future_returns[obi_values > 0.2]
obi_low = future_returns[obi_values < -0.2]
obi_mid = future_returns[(obi_values >= -0.2) & (obi_values <= 0.2)]
print(f"OBI > 0.2 时平均收益: {np.mean(obi_high):.6f}")
print(f"OBI < -0.2 时平均收益: {np.mean(obi_low):.6f}")
print(f"|OBI| <= 0.2 时平均收益: {np.mean(obi_mid):.6f}")七、本文件核心要点
| 主题 | 关键方法 | 核心洞察 |
|---|---|---|
| Tick 清洗 | 过滤异常、去重、排序 | 脏数据导致虚假结论 |
| U 形曲线 | 成交量/波动率的日内分布 | 开盘收盘活跃,午间冷清 |
| 已实现波动率 | RV = sum(r^2) | 高频数据更精确地估计波动率 |
| 微观噪声 | TSRV 两尺度模型 | 采样频率越高,噪声影响越大 |
| 成交量分布 | 对数正态/厚尾分布 | 少量大单贡献了大量成交 |
| 订单簿分析 | OBI 不平衡指标 | 买卖力量对比可预测短期价格 |
一句话总结:高频数据分析的核心挑战是”在噪声中提取信号”——频率越高信息越多,但噪声也越大。