数据基础设施与算法执行
构建可靠的数据底座和高效的执行系统,是量化投资从研究到实盘的关键桥梁
Part 1:数据基础设施
1.1 存储选型对比
量化数据具有时序性强、读多写少、需要历史回溯等特点,存储选型直接影响系统性能。
| 存储方案 | 适用场景 | 优势 | 劣势 | 量化适用度 |
|---|---|---|---|---|
| PostgreSQL + TimescaleDB | 中小团队、全功能需求 | SQL 生态、时序扩展、免费 | 写入性能上限 | 高 |
| DuckDB | 本地分析、研究环境 | 零部署、列式、极快 | 不支持并发写入 | 高 |
| ClickHouse | 高频数据、大规模查询 | 列式存储、写入极快 | 运维复杂 | 中高 |
| Parquet 文件 | 历史数据存储、离线分析 | 压缩率高、跨平台、免费 | 不支持实时更新 | 高 |
| kdb+/q | 高频交易、专业量化 | 极致性能、原生时序 | 昂贵、学习曲线陡 | 中(大机构) |
| Redis | 缓存、实时行情 | 内存级速度 | 数据持久化弱 | 中(辅助层) |
| Arctic | 因子/研究数据管理 | Python 原生、版本控制 | 依赖 MongoDB | 中高 |
| AWS S3 + Athena | 云端、低成本归档 | 无限扩展、按需付费 | 查询延迟较高 | 中 |
实战建议:小团队推荐 DuckDB + Parquet 的组合,简单高效零运维。需要生产级数据库时,PostgreSQL + TimescaleDB 是最平衡的选择。
"""
DuckDB + Parquet 量化数据存储方案
DuckDB 是嵌入式的列式分析数据库,零配置,与 Parquet 天然兼容。
"""
import duckdb
import pandas as pd
import numpy as np
from pathlib import Path
# 创建内存数据库(或连接持久化文件)
con = duckdb.connect(":memory:") # 或 duckdb.connect("quant.duckdb")
# 示例:创建行情表并从 Parquet 文件加载
# 假设已有 parquet 文件: data/markets/daily/A股日线.parquet
con.execute("""
CREATE TABLE daily_quotes AS
SELECT * FROM read_parquet('data/markets/daily/*.parquet',
hive_partitioning=true)
WHERE date >= '2018-01-01'
""")
# 高效查询示例
df = con.execute("""
SELECT
stock_code,
date,
close,
volume,
-- 计算滚动 20 日收益率
(close / lag(close, 20) OVER (PARTITION BY stock_code ORDER BY date) - 1)
AS return_20d,
-- 计算 20 日成交量均值
avg(volume) OVER (
PARTITION BY stock_code
ORDER BY date
ROWS BETWEEN 19 PRECEDING AND CURRENT ROW
) AS volume_ma20
FROM daily_quotes
WHERE stock_code IN ('000001.SZ', '600519.SH')
ORDER BY stock_code, date
""").df()
print(f"查询结果: {len(df)} 行, 耗时极低(列式引擎优势)")1.2 因子库设计
因子库是量化研究的核心资产管理系统,需要支持因子注册、版本控制、血缘追踪。
"""
因子库设计 — FactorLibrary
核心功能:
1. 因子注册:定义因子元数据(名称、描述、依赖、频率)
2. 因子计算:自动解析依赖关系,按拓扑排序执行
3. 版本管理:记录因子计算逻辑的变更历史
4. 血缘追踪:因子依赖关系图,便于影响分析
"""
import pandas as pd
import numpy as np
from datetime import datetime
from typing import Callable, Optional
from dataclasses import dataclass, field
@dataclass
class FactorMeta:
"""因子元数据"""
name: str # 因子名称(唯一标识)
description: str # 因子描述
category: str # 分类(value/momentum/volatility/quality)
frequency: str # 频率(daily/weekly/monthly)
dependencies: list = field(default_factory=list) # 依赖的因子/字段
version: str = "1.0" # 版本号
author: str = "" # 创建者
created_at: str = field(default_factory=lambda: datetime.now().isoformat())
class FactorLibrary:
"""因子库管理器"""
def __init__(self, storage_path: str = "./factors/"):
self.storage_path = Path(storage_path)
self.storage_path.mkdir(exist_ok=True)
self.factors: dict[str, FactorMeta] = {}
self.computers: dict[str, Callable] = {}
def register(self, meta: FactorMeta, compute_fn: Callable):
"""
注册因子
参数:
meta: 因子元数据
compute_fn: 计算函数,签名: fn(data: dict) -> pd.DataFrame
"""
if meta.name in self.factors:
print(f"警告: 因子 {meta.name} 已存在,将覆盖")
self.factors[meta.name] = meta
self.computers[meta.name] = compute_fn
print(f"已注册因子: {meta.name} (v{meta.version})")
def compute_factor(self, name: str, data: dict) -> pd.DataFrame:
"""计算单个因子"""
if name not in self.computers:
raise ValueError(f"因子 {name} 未注册")
# 检查依赖是否满足
meta = self.factors[name]
missing = [d for d in meta.dependencies if d not in data]
if missing:
raise ValueError(f"缺少依赖数据: {missing}")
result = self.computers[name](data)
return result
def save_factor(self, name: str, df: pd.DataFrame):
"""保存因子到 Parquet 文件"""
meta = self.factors[name]
factor_dir = self.storage_path / meta.category
factor_dir.mkdir(parents=True, exist_ok=True)
filepath = factor_dir / f"{name}_v{meta.version}.parquet"
df.to_parquet(filepath, index=False)
print(f"因子已保存: {filepath}")
def list_factors(self) -> pd.DataFrame:
"""列出所有已注册因子"""
records = []
for name, meta in self.factors.items():
records.append({
"name": name,
"category": meta.category,
"frequency": meta.frequency,
"version": meta.version,
"dependencies": ", ".join(meta.dependencies),
"author": meta.author,
})
return pd.DataFrame(records)
# 使用示例:注册常用因子
lib = FactorLibrary()
# 注册动量因子
lib.register(
meta=FactorMeta(
name="momentum_20d",
description="20日动量因子:过去20日收益率",
category="momentum",
frequency="daily",
dependencies=["close"],
),
compute_fn=lambda data: pd.DataFrame({
"date": data["close"]["date"],
"stock": data["close"]["stock"],
"momentum_20d": data["close"].groupby("stock")["close"]
.pct_change(20),
}),
)
# 注册波动率因子
lib.register(
meta=FactorMeta(
name="volatility_20d",
description="20日年化波动率",
category="volatility",
frequency="daily",
dependencies=["close"],
),
compute_fn=lambda data: pd.DataFrame({
"date": data["close"]["date"],
"stock": data["close"]["stock"],
"volatility_20d": data["close"].groupby("stock")["close"]
.pct_change().rolling(20).std() * np.sqrt(252),
}),
)
print(lib.list_factors())1.3 数据管道
数据管道将分散的数据源统一为干净、标准化的研究数据。
┌─────────────────────────────────────────────────────────────────────────────┐
│ 量化数据管道架构 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 行情数据 │ │ 财务数据 │ │ 另类数据 │ 数据源层 │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────┐ │
│ │ 数据采集层 (Ingestion) │ │
│ │ 行情API / 财报爬虫 / 另类数据接口 │ │
│ └──────────────────┬───────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────┐ │
│ │ 数据清洗层 (Cleaning) │ │
│ │ 停牌过滤 / 异常值检测 / 复权处理 │ │
│ └──────────────────┬───────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────┐ │
│ │ 数据计算层 (Compute) │ │
│ │ 收益率 / 技术指标 / 因子计算 │ │
│ └──────────────────┬───────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────┐ │
│ │ 数据存储层 (Storage) │ │
│ │ Parquet / DuckDB / TimescaleDB │ │
│ └──────────────────┬───────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────┐ │
│ │ 数据服务层 (Serving) │ │
│ │ 研究查询 / 回测接口 / 实盘推送 │ │
│ └──────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
"""
数据管道实现 — 每日数据更新流程
"""
import pandas as pd
import numpy as np
from pathlib import Path
class DataPipeline:
"""量化数据管道"""
def __init__(self, raw_dir: str = "./data/raw/",
clean_dir: str = "./data/clean/",
factor_dir: str = "./data/factors/"):
self.raw_dir = Path(raw_dir)
self.clean_dir = Path(clean_dir)
self.factor_dir = Path(factor_dir)
# 确保目录存在
for d in [self.raw_dir, self.clean_dir, self.factor_dir]:
d.mkdir(parents=True, exist_ok=True)
def run_daily_update(self, trade_date: str):
"""
执行每日数据更新
步骤:
1. 采集原始数据
2. 清洗和标准化
3. 计算衍生指标
4. 质量校验
5. 存储
"""
print(f"[{trade_date}] 开始每日数据更新")
# Step 1: 采集(此处为示例,实际接入数据源 API)
raw_quotes = self._fetch_daily_quotes(trade_date)
# Step 2: 清洗
clean_quotes = self._clean_quotes(raw_quotes)
# Step 3: 计算衍生指标
enriched = self._compute_derived(clean_quotes)
# Step 4: 质量校验
issues = self._validate_data(enriched, trade_date)
if issues:
print(f"[警告] 数据质量问题: {issues}")
# Step 5: 存储
self._save_data(enriched, trade_date)
print(f"[{trade_date}] 数据更新完成")
def _fetch_daily_quotes(self, trade_date: str) -> pd.DataFrame:
"""获取日线行情(示例)"""
# 实际实现中调用行情 API
return pd.DataFrame()
def _clean_quotes(self, df: pd.DataFrame) -> pd.DataFrame:
"""清洗行情数据"""
# 去除停牌日(成交量为0且价格不变)
df = df[~((df["volume"] == 0) & (df["close"] == df["pre_close"]))]
# 去除异常值(涨跌幅超过20%且非ST)
df["pct_change"] = df["close"].pct_change()
df = df[df["pct_change"].between(-0.2, 0.2)]
return df
def _compute_derived(self, df: pd.DataFrame) -> pd.DataFrame:
"""计算衍生指标"""
df = df.sort_values(["stock", "date"])
df["return_1d"] = df.groupby("stock")["close"].pct_change(1)
df["return_5d"] = df.groupby("stock")["close"].pct_change(5)
df["turnover_rate"] = df["volume"] / df["shares_outstanding"]
return df
def _validate_data(self, df: pd.DataFrame, trade_date: str) -> list:
"""数据质量校验(见 1.5 节)"""
issues = []
if df.empty:
issues.append("数据为空")
if df["stock"].nunique() < 100:
issues.append(f"股票数量异常: {df['stock'].nunique()}")
return issues
def _save_data(self, df: pd.DataFrame, trade_date: str):
"""保存清洗后的数据"""
# 按日期分区存储为 Parquet
partition_dir = self.clean_dir / f"date={trade_date}"
partition_dir.mkdir(exist_ok=True)
df.to_parquet(partition_dir / "daily_quotes.parquet", index=False)1.4 数据清洗与复权
复权是量化数据处理中最基础也是最容易出错的操作之一。
"""
数据清洗与复权处理
复权类型:
- 前复权 (Forward Adjusted): 以最新价格为基准,历史价格向下调整
→ 保持最新价格不变,历史 K 线连续
→ 适合技术分析和回测(推荐)
- 后复权 (Backward Adjusted): 以 IPO 价格为基准,当前价格向上调整
→ 保持历史价格不变,最新价格可能很高
→ 适合看历史涨幅
- 不复权: 使用真实交易价格
→ 除权除息日会产生价格跳空
→ 适合执行层面分析
"""
import pandas as pd
import numpy as np
def compute_adj_factor(df: pd.DataFrame) -> pd.DataFrame:
"""
计算复权因子
输入 DataFrame 需要包含:
date, stock, open, high, low, close, volume
dividend: 分红金额(每股)
split_ratio: 拆股比例(如 10送10 则为 2.0)
输出:
adj_factor: 复权因子序列
"""
df = df.sort_values(["stock", "date"]).copy()
# 累计复权因子
df["adj_factor"] = 1.0
for stock in df["stock"].unique():
mask = df["stock"] == stock
stock_df = df[mask].copy()
stock_df["adj_factor"] = 1.0
# 从最后一天向前累计
for i in range(len(stock_df) - 2, -1, -1):
current = stock_df.iloc[i + 1]
prev = stock_df.iloc[i]
# 复权因子 = (前一日收盘价 / 后一日复权后价格)
# 简化处理:只考虑分红和拆股
dividend = prev.get("dividend", 0) or 0
split = prev.get("split_ratio", 1.0) or 1.0
stock_df.iloc[i, stock_df.columns.get_loc("adj_factor")] = (
stock_df.iloc[i + 1]["adj_factor"]
* (current["close"] / (current["close"] * split - dividend))
)
df.loc[mask, "adj_factor"] = stock_df["adj_factor"].values
return df
def forward_adjust(df: pd.DataFrame) -> pd.DataFrame:
"""
前复权处理
前复权公式: adj_price = raw_price * adj_factor
最新 adj_factor = 1.0,历史 adj_factor > 1.0(历史价格被调低)
"""
adj_df = compute_adj_factor(df).copy()
# 前复权:价格除以复权因子
for col in ["open", "high", "low", "close"]:
adj_df[f"adj_{col}"] = adj_df[col] / adj_df["adj_factor"]
# 成交量乘以复权因子(保持成交额不变)
adj_df["adj_volume"] = adj_df["volume"] * adj_df["adj_factor"]
# 计算前复权收益率(等价于使用真实价格计算的收益率)
adj_df["adj_return"] = adj_df.groupby("stock")["adj_close"].pct_change()
return adj_df1.5 数据质量校验
数据质量是量化系统的生命线,垃圾数据只会产出垃圾策略。
| 校验维度 | 检查项 | 阈值建议 | 处理方式 |
|---|---|---|---|
| 完整性 | 每日股票数量 | > 90% 历史均值 | 告警 + 手动检查 |
| 一致性 | OHLC 关系 | Low ⇐ Close ⇐ High | 修正或标记 |
| 时效性 | 数据更新时间 | 交易日 18:00 前 | 告警 |
| 准确性 | 涨跌幅匹配 | abs(calc - reported) < 0.01% | 修正 |
| 连续性 | 价格跳变 | 单日涨跌幅 < 30%(非ST) | 标记异常 |
| 唯一性 | 去重检查 | 无重复记录 | 去重 |
"""
数据质量校验框架
"""
import pandas as pd
import numpy as np
from dataclasses import dataclass
@dataclass
class QualityIssue:
"""数据质量问题"""
category: str # 问题类别
severity: str # 严重程度(error/warning/info)
description: str # 问题描述
affected_records: int # 影响记录数
class DataQualityChecker:
"""数据质量检查器"""
def __init__(self, rules: dict = None):
self.rules = rules or {}
self.issues: list = []
def check_completeness(self, df: pd.DataFrame,
date_col: str, stock_col: str,
expected_stocks: int = 4000) -> list:
"""完整性检查:股票数量是否异常"""
issues = []
daily_counts = df.groupby(date_col)[stock_col].nunique()
mean_count = daily_counts.mean()
std_count = daily_counts.std()
for date, count in daily_counts.items():
if count < mean_count - 3 * std_count:
issues.append(QualityIssue(
category="completeness",
severity="error",
description=f"{date} 仅 {count} 只股票,期望约 {int(mean_count)}",
affected_records=expected_stocks - count,
))
return issues
def check_ohlc_consistency(self, df: pd.DataFrame) -> list:
"""OHLC 一致性检查"""
issues = []
# 检查 Low <= Close <= High
invalid_high = df[df["close"] > df["high"]]
invalid_low = df[df["close"] < df["low"]]
invalid_range = df[df["low"] > df["high"]]
if len(invalid_high) > 0:
issues.append(QualityIssue(
category="consistency",
severity="error",
description=f"Close > High: {len(invalid_high)} 条记录",
affected_records=len(invalid_high),
))
if len(invalid_low) > 0:
issues.append(QualityIssue(
category="consistency",
severity="error",
description=f"Close < Low: {len(invalid_low)} 条记录",
affected_records=len(invalid_low),
))
return issues
def check_price_outliers(self, df: pd.DataFrame,
max_change: float = 0.30) -> list:
"""价格异常检查(排除 ST)"""
issues = []
df["pct_change"] = df.groupby("stock")["close"].pct_change()
outliers = df[abs(df["pct_change"]) > max_change]
if len(outliers) > 0:
issues.append(QualityIssue(
category="accuracy",
severity="warning",
description=f"涨跌幅超过 {max_change:.0%}: {len(outliers)} 条记录",
affected_records=len(outliers),
))
return issues
def run_all_checks(self, df: pd.DataFrame) -> list:
"""执行所有检查"""
all_issues = []
all_issues.extend(self.check_completeness(df, "date", "stock"))
all_issues.extend(self.check_ohlc_consistency(df))
all_issues.extend(self.check_price_outliers(df))
# 汇总报告
errors = sum(1 for i in all_issues if i.severity == "error")
warnings = sum(1 for i in all_issues if i.severity == "warning")
print(f"质量检查完成: {errors} 个错误, {warnings} 个警告")
for issue in all_issues:
print(f" [{issue.severity.upper()}] {issue.category}: {issue.description}")
return all_issues1.6 Parquet 格式
Parquet 是量化数据存储的事实标准格式,理解其设计原理有助于高效使用。
| 特性 | 说明 | 量化场景优势 |
|---|---|---|
| 列式存储 | 按列而非按行存储数据 | 只读取需要的列,IO 减少 10-100x |
| 压缩编码 | 列级压缩(Snappy/ZSTD/GZIP) | 量化数据压缩比 5-20x |
| 类型保留 | 保留原始数据类型(int32/float64) | 避免字符串解析开销 |
| 分区裁剪 | 按日期/股票分区,查询自动跳过 | 查询特定日期极快 |
| 谓词下推 | 过滤条件下推到存储层 | 减少读取数据量 |
| Schema 演进 | 支持新增列,兼容旧文件 | 因子迭代不影响历史数据 |
"""
Parquet 文件操作最佳实践
"""
import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
# 写入 Parquet — 最佳实践
def write_parquet_optimized(df: pd.DataFrame, filepath: str,
partition_cols: list = None):
"""
优化写入 Parquet 文件
关键优化:
1. 指定列类型(减少存储空间)
2. 使用分区(加速查询)
3. 选择压缩算法(ZSTD 平衡速度和压缩率)
"""
# 优化数据类型
dtypes = {
"date": "datetime64[ns]",
"stock": "category", # 分类列用字典编码
"open": "float32", # float32 足够精度
"high": "float32",
"low": "float32",
"close": "float32",
"volume": "int64",
"turnover": "float64", # 成交额需要更高精度
}
for col, dtype in dtypes.items():
if col in df.columns:
df[col] = df[col].astype(dtype)
# 写入(使用 ZSTD 压缩,行组大小 100000 行)
table = pa.Table.from_pandas(df, preserve_index=False)
pq.write_to_dataset(
table,
root_path=filepath,
partition_cols=partition_cols,
compression="zstd",
row_group_size=100_000,
use_dictionary=True,
)
# 读取 Parquet — 带过滤条件
def read_parquet_filtered(filepath: str, date_range: tuple = None,
stocks: list = None) -> pd.DataFrame:
"""
高效读取 Parquet(谓词下推)
只有满足条件的数据会被读入内存,大幅减少 IO
"""
filters = []
if date_range:
filters.append(("date", ">=", date_range[0]))
filters.append(("date", "<=", date_range[1]))
if stocks:
filters.append(("stock", "in", stocks))
return pd.read_parquet(filepath, filters=filters if filters else None)Part 2:算法执行系统
2.1 TWAP / VWAP 执行算法
TWAP(时间加权平均价格)和 VWAP(成交量加权平均价格)是最基础的执行算法。
"""
TWAP 和 VWAP 执行算法
TWAP: 将订单均匀拆分到时间窗口内执行
VWAP: 参考历史成交量分布拆分订单,跟随市场节奏
"""
import numpy as np
import pandas as pd
from dataclasses import dataclass
@dataclass
class ExecutionOrder:
"""执行订单"""
stock: str
side: str # buy / sell
total_qty: float # 总数量
start_time: str
end_time: str
algo: str # twap / vwap
limit_price: float # 限价
filled_qty: float = 0
filled_price: float = 0
child_orders: list = None
class TWAPExecutor:
"""
TWAP 执行器
策略: 将大单均匀拆分为 N 个时间片,每个时间片执行等量订单
优势: 简单、可预测、市场冲击小
劣势: 不考虑成交量变化,可能在低流动性时段执行
"""
def __init__(self, num_slices: int = 20):
self.num_slices = num_slices
def generate_schedule(self, order: ExecutionOrder) -> pd.DataFrame:
"""
生成 TWAP 执行计划
返回: 每个时间片的执行量
"""
total_minutes = self._time_diff_minutes(order.start_time, order.end_time)
slice_minutes = total_minutes / self.num_slices
qty_per_slice = order.total_qty / self.num_slices
schedule = []
for i in range(self.num_slices):
schedule.append({
"slice": i + 1,
"time": self._add_minutes(order.start_time, i * slice_minutes),
"target_qty": qty_per_slice,
"cumulative_qty": qty_per_slice * (i + 1),
})
return pd.DataFrame(schedule)
class VWAPExecutor:
"""
VWAP 执行器
策略: 参考历史同时段成交量分布,在高成交量的时段多执行
优势: 更好地融入市场,降低市场冲击
劣势: 需要历史成交量分布数据
"""
def __init__(self, volume_profile: pd.Series):
"""
参数:
volume_profile: 历史成交量分布(按时段归一化)
index: 时段 (如 "09:30", "10:00")
value: 该时段成交量占比
"""
self.volume_profile = volume_profile / volume_profile.sum()
def generate_schedule(self, order: ExecutionOrder,
current_time: str = None) -> pd.DataFrame:
"""生成 VWAP 执行计划"""
if current_time is None:
current_time = order.start_time
# 剩余时段的成交量分布
remaining_profile = self.volume_profile[self.volume_profile.index >= current_time]
remaining_profile = remaining_profile / remaining_profile.sum()
schedule = []
for time_slot, ratio in remaining_profile.items():
schedule.append({
"time": time_slot,
"volume_ratio": ratio,
"target_qty": order.total_qty * ratio,
})
df = pd.DataFrame(schedule)
df["cumulative_qty"] = df["target_qty"].cumsum()
return df2.2 Almgren-Chriss 最优执行
Almgren-Chriss 模型是算法执行的理论基石,在市场冲击和时机风险之间寻求最优平衡。
核心思想:
- 快速执行 → 高市场冲击,低时机风险
- 缓慢执行 → 低市场冲击,高时机风险
- 最优策略:在两者之间找到平衡
目标函数:
min E[C] + λ * Var[C]
其中:
C: 总执行成本 = 市场冲击 + 时机成本
λ: 风险厌恶参数
E[C]: 期望执行成本(主要由永久冲击决定)
Var[C]: 执行成本方差(主要由价格波动决定)
"""
Almgren-Chriss 最优执行框架
模型参数:
σ: 资产日波动率
η: 临时冲击系数(每股交易对价格的即时影响)
γ: 永久冲击系数(交易对基本面价格的影响)
T: 总执行时间(天)
N: 时间步数
X: 总需执行股数
λ: 风险厌恶参数
"""
import numpy as np
import matplotlib.pyplot as plt
class AlmgrenChriss:
"""Almgren-Chriss 最优执行模型"""
def __init__(self, sigma: float, eta: float, gamma: float,
T: float = 1.0, N: int = 20, lambd: float = 1e-6):
"""
参数:
sigma: 日波动率 (如 0.02 = 2%)
eta: 临时冲击系数
gamma: 永久冲击系数
T: 总时间 (天)
N: 离散时间步数
lambd: 风险厌恶参数
"""
self.sigma = sigma
self.eta = eta
self.gamma = gamma
self.T = T
self.N = N
self.lambd = lambd
self.dt = T / N # 时间步长
# 计算最优策略参数
self.kappa = np.sqrt(
self.sigma ** 2 / (2 * self.lambd * self.eta)
) # 风险中性速率
# 计算交易轨迹
self._compute_optimal_trajectory()
def _compute_optimal_trajectory(self):
"""计算最优交易轨迹"""
# 时间网格
self.tau = np.array([
self.T - i * self.dt for i in range(self.N + 1)
])
# 最优轨迹: n_j = X * (sinh(kappa * tau_j) / sinh(kappa * T))
self.optimal_participation = (
np.sinh(self.kappa * self.tau) / np.sinh(self.kappa * self.T)
)
def get_execution_schedule(self, total_shares: float) -> pd.DataFrame:
"""
获取执行计划
返回:
DataFrame: [step, time, remaining_shares, trade_size, participation_rate]
"""
remaining = total_shares * self.optimal_participation
trade_size = -np.diff(remaining)
times = np.linspace(0, self.T, self.N + 1)
schedule = pd.DataFrame({
"step": range(self.N + 1),
"time": times,
"remaining_shares": remaining,
"trade_size": np.append(trade_size, 0),
"participation_rate": self.optimal_participation,
})
return schedule
def estimate_costs(self, total_shares: float,
daily_volume: float) -> dict:
"""
估计执行成本
返回:
临时冲击成本、永久冲击成本、总成本、有效价差
"""
avg_daily_volume = daily_volume / 252 # 日均成交量
# 永久冲击成本 ≈ γ * X² / (2 * T)
permanent_impact = self.gamma * total_shares ** 2 / (2 * self.T)
# 临时冲击成本 ≈ η * Σ n_j²
schedule = self.get_execution_schedule(total_shares)
temp_impact = self.eta * np.sum(schedule["trade_size"] ** 2)
# 时机风险 ≈ σ * sqrt(T/3) * X(近似)
timing_risk = self.sigma * np.sqrt(self.T / 3) * total_shares
return {
"permanent_impact_bps": permanent_impact / total_shares * 10000,
"temporary_impact_bps": temp_impact / total_shares * 10000,
"timing_risk_bps": timing_risk / total_shares * 10000,
"total_expected_cost_bps": (
permanent_impact + temp_impact
) / total_shares * 10000,
}
# 使用示例
ac = AlmgrenChriss(
sigma=0.02, # 2% 日波动率
eta=0.05, # 临时冲击系数
gamma=0.02, # 永久冲击系数
T=1.0, # 1 天内执行
N=20, # 20 个时间步
lambd=1e-6, # 风险厌恶
)
schedule = ac.get_execution_schedule(total_shares=100000)
costs = ac.estimate_costs(total_shares=100000, daily_volume=50000000)
print(f"预期总执行成本: {costs['total_expected_cost_bps']:.2f} bps")
print(f"永久冲击: {costs['permanent_impact_bps']:.2f} bps")
print(f"临时冲击: {costs['temporary_impact_bps']:.2f} bps")2.3 SOR 与 POV
SOR(Smart Order Routing, 智能订单路由) 自动选择最优交易所和流动性池执行订单。
POV(Percentage of Volume, 参与率控制) 限制订单执行速度不超过市场成交量的某个比例。
"""
SOR / POV 执行策略
SOR 核心逻辑:
1. 查询各交易所的报价和深度
2. 按价格最优 → 费用最低 → 延迟最短的优先级排序
3. 拆分订单到多个交易所执行
POV 核心逻辑:
1. 实时监控市场成交量
2. 控制自身执行量 ≤ POV% × 市场成交量
3. 市场流动性好时多执行,流动性差时少执行
"""
import numpy as np
def smart_order_route(order_qty: float, venue_quotes: list) -> list:
"""
智能订单路由
参数:
order_qty: 总订单量
venue_quotes: 交易所报价列表
[{"venue": "SSE", "price": 10.01, "available_qty": 5000, "fee_rate": 0.0001}, ...]
返回:
各交易所的子订单列表
"""
# 按价格排序(买单价从低到高,卖单价从高到低)
sorted_quotes = sorted(venue_quotes, key=lambda x: x["price"])
remaining = order_qty
allocations = []
for quote in sorted_quotes:
if remaining <= 0:
break
# 在该交易所执行的量 = min(剩余量, 可用量)
exec_qty = min(remaining, quote["available_qty"])
remaining -= exec_qty
allocations.append({
"venue": quote["venue"],
"price": quote["price"],
"qty": exec_qty,
"fee": exec_qty * quote["price"] * quote["fee_rate"],
})
vwap = sum(a["qty"] * a["price"] for a in allocations) / sum(
a["qty"] for a in allocations
) if allocations else 0
return allocations
def pov_execute(target_qty: float, pov_rate: float,
market_volume_series: np.ndarray) -> list:
"""
POV 参与率执行
参数:
target_qty: 目标执行总量
pov_rate: 参与率 (如 0.1 = 市场成交量的 10%)
market_volume_series: 预期/实时市场成交量序列
返回:
每个时间步的目标执行量
"""
exec_schedule = []
remaining = target_qty
for market_vol in market_volume_series:
if remaining <= 0:
exec_schedule.append(0)
continue
# POV 限制:执行量不超过市场成交量的 pov_rate
max_exec = market_vol * pov_rate
exec_qty = min(remaining, max_exec)
exec_schedule.append(exec_qty)
remaining -= exec_qty
completion_rate = (target_qty - remaining) / target_qty
print(f"POV 执行完成率: {completion_rate:.1%}")
return exec_schedule2.4 TCA 交易成本分析
TCA(Transaction Cost Analysis)量化评估执行质量,识别成本来源并优化。
"""
TCA 交易成本分析框架
成本分解:
总成本 = 显性成本 + 隐性成本
= (佣金 + 印花税 + 过户费) + (买卖价差 + 市场冲击 + 时机成本)
买卖价差 (Spread): 买一卖一之间的差价
市场冲击 (Impact): 大单推动价格的不利变动
时机成本 (Timing): 等待执行期间价格的不利变动
"""
import pandas as pd
import numpy as np
class TCAnalyzer:
"""交易成本分析器"""
def __init__(self):
self.trades = []
def add_trade(self, trade: dict):
"""记录一笔成交"""
self.trades.append(trade)
def analyze(self) -> dict:
"""
执行成本分析
输入 trades 中每笔记录需包含:
stock, side, qty, price, benchmark_price,
mid_price, arrival_price, decision_price
"""
df = pd.DataFrame(self.trades)
if df.empty:
return {}
total_qty = df["qty"].sum()
vwap = (df["qty"] * df["price"]).sum() / total_qty
# 1. 显性成本(佣金 + 税费)
commission_rate = 0.0003 # 佣金万三
stamp_tax_rate = 0.001 # 印花税千一(仅卖出)
explicit_cost = (
total_qty * vwap * commission_rate
+ df[df["side"] == "sell"]["qty"].sum() * vwap * stamp_tax_rate
)
# 2. 买卖价差成本
spread_cost = abs(df["price"] - df["mid_price"]).sum() * df["qty"]
# 3. 市场冲击成本(相对于 VWAP 基准)
# 买入冲击 = (执行价 - 基准价) * 量
impact_cost = 0
for _, row in df.iterrows():
if row["side"] == "buy":
impact_cost += (row["price"] - row["benchmark_price"]) * row["qty"]
else:
impact_cost += (row["benchmark_price"] - row["price"]) * row["qty"]
# 4. 时机成本(相对于决策价格)
timing_cost = 0
for _, row in df.iterrows():
if row["side"] == "buy":
timing_cost += (row["price"] - row["arrival_price"]) * row["qty"]
else:
timing_cost += (row["arrival_price"] - row["price"]) * row["qty"]
# 5. 实现差(Implementation Shortfall)
# IS = (决策价 - 执行价) * 方向 * 数量
is_total = 0
for _, row in df.iterrows():
direction = 1 if row["side"] == "buy" else -1
is_total += direction * (row["decision_price"] - row["price"]) * row["qty"]
# 汇总(转换为 bps)
notional = total_qty * vwap
report = {
"total_notional": notional,
"vwap": vwap,
"total_shares": total_qty,
"costs_bps": {
"explicit": explicit_cost / notional * 10000,
"spread": spread_cost / notional * 10000,
"market_impact": impact_cost / notional * 10000,
"timing": timing_cost / notional * 10000,
"implementation_shortfall": is_total / notional * 10000,
"total": (explicit_cost + spread_cost + impact_cost
+ timing_cost) / notional * 10000,
},
}
return report
def print_report(self, report: dict):
"""打印成本分析报告"""
print("=" * 60)
print("交易成本分析报告 (TCA)")
print("=" * 60)
print(f"总成交额: {report['total_notional']:,.0f}")
print(f"VWAP: {report['vwap']:.4f}")
print(f"总股数: {report['total_shares']:,.0f}")
print("-" * 40)
print(f"{'成本类别':<25} {'金额 (bps)':>12}")
print("-" * 40)
for cost_name, cost_bps in report["costs_bps"].items():
label_map = {
"explicit": "显性成本(佣金+税费)",
"spread": "买卖价差成本",
"market_impact": "市场冲击成本",
"timing": "时机成本",
"implementation_shortfall": "实现差 (IS)",
"total": "总成本",
}
label = label_map.get(cost_name, cost_name)
marker = ">>>" if cost_name == "total" else " "
print(f"{marker} {label:<22} {cost_bps:>10.2f} bps")
print("=" * 60)2.5 执行系统架构
┌─────────────────────────────────────────────────────────────────────────────┐
│ 算法执行系统架构 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 策略信号层 │ │ 订单管理层 │ │ 执行引擎层 │ │
│ │ │ │ │ │ │ │
│ │ Alpha 信号 │ ─→ │ 订单拆分 │ ─→ │ TWAP/VWAP │ │
│ │ 目标仓位 │ │ 风控检查 │ │ Almgren-C │ │
│ │ 风险预算 │ │ 限额管理 │ │ POV/SOR │ │
│ └─────────────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────────────────┐ │
│ │ 市场数据层 │ │
│ │ 实时行情 / 订单簿 / 成交 │ │
│ └──────────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ 交易所接入层 │ │
│ │ SSE / SZSE / CFFEX / 港股 │ │
│ └──────────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ 监控与分析层 │ │
│ │ TCA / 执行报告 / 异常告警 │ │
│ └──────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
核心总结
┌──────────────────────────────────────────────────────────────────┐
│ 数据基础设施与算法执行核心要点 │
├──────────────────────────────────────────────────────────────────┤
│ │
│ 1. 存储选型:DuckDB + Parquet 是中小团队最优解 │
│ 零运维、列式存储、压缩高效、与 pandas 无缝集成 │
│ │
│ 2. 因子库是核心资产 │
│ 注册 → 计算 → 存储 → 版本管理 → 血缘追踪 │
│ │
│ 3. 数据管道五步法 │
│ 采集 → 清洗 → 计算 → 校验 → 存储 │
│ │
│ 4. 复权用前复权,执行用不复权 │
│ 前复权适合回测,不复权适合实盘执行 │
│ │
│ 5. 执行算法:TWAP 简单,VWAP 跟随,AC 最优 │
│ 小单用 TWAP,大单用 Almgren-Chriss │
│ │
│ 6. TCA 是执行质量的唯一度量标准 │
│ 将成本分解为价差/冲击/时机,逐项优化 │
│ │
└──────────────────────────────────────────────────────────────────┘