数据基础设施与算法执行

构建可靠的数据底座和高效的执行系统,是量化投资从研究到实盘的关键桥梁


Part 1:数据基础设施

1.1 存储选型对比

量化数据具有时序性强、读多写少、需要历史回溯等特点,存储选型直接影响系统性能。

存储方案适用场景优势劣势量化适用度
PostgreSQL + TimescaleDB中小团队、全功能需求SQL 生态、时序扩展、免费写入性能上限
DuckDB本地分析、研究环境零部署、列式、极快不支持并发写入
ClickHouse高频数据、大规模查询列式存储、写入极快运维复杂中高
Parquet 文件历史数据存储、离线分析压缩率高、跨平台、免费不支持实时更新
kdb+/q高频交易、专业量化极致性能、原生时序昂贵、学习曲线陡中(大机构)
Redis缓存、实时行情内存级速度数据持久化弱中(辅助层)
Arctic因子/研究数据管理Python 原生、版本控制依赖 MongoDB中高
AWS S3 + Athena云端、低成本归档无限扩展、按需付费查询延迟较高

实战建议:小团队推荐 DuckDB + Parquet 的组合,简单高效零运维。需要生产级数据库时,PostgreSQL + TimescaleDB 是最平衡的选择。

"""
DuckDB + Parquet 量化数据存储方案
 
DuckDB 是嵌入式的列式分析数据库,零配置,与 Parquet 天然兼容。
"""
import duckdb
import pandas as pd
import numpy as np
from pathlib import Path
 
# 创建内存数据库(或连接持久化文件)
con = duckdb.connect(":memory:")  # 或 duckdb.connect("quant.duckdb")
 
# 示例:创建行情表并从 Parquet 文件加载
# 假设已有 parquet 文件: data/markets/daily/A股日线.parquet
con.execute("""
    CREATE TABLE daily_quotes AS
    SELECT * FROM read_parquet('data/markets/daily/*.parquet',
        hive_partitioning=true)
    WHERE date >= '2018-01-01'
""")
 
# 高效查询示例
df = con.execute("""
    SELECT
        stock_code,
        date,
        close,
        volume,
        -- 计算滚动 20 日收益率
        (close / lag(close, 20) OVER (PARTITION BY stock_code ORDER BY date) - 1)
            AS return_20d,
        -- 计算 20 日成交量均值
        avg(volume) OVER (
            PARTITION BY stock_code
            ORDER BY date
            ROWS BETWEEN 19 PRECEDING AND CURRENT ROW
        ) AS volume_ma20
    FROM daily_quotes
    WHERE stock_code IN ('000001.SZ', '600519.SH')
    ORDER BY stock_code, date
""").df()
 
print(f"查询结果: {len(df)} 行, 耗时极低(列式引擎优势)")

1.2 因子库设计

因子库是量化研究的核心资产管理系统,需要支持因子注册、版本控制、血缘追踪。

"""
因子库设计 — FactorLibrary
 
核心功能:
    1. 因子注册:定义因子元数据(名称、描述、依赖、频率)
    2. 因子计算:自动解析依赖关系,按拓扑排序执行
    3. 版本管理:记录因子计算逻辑的变更历史
    4. 血缘追踪:因子依赖关系图,便于影响分析
"""
import pandas as pd
import numpy as np
from datetime import datetime
from typing import Callable, Optional
from dataclasses import dataclass, field
 
 
@dataclass
class FactorMeta:
    """因子元数据"""
    name: str                    # 因子名称(唯一标识)
    description: str             # 因子描述
    category: str                # 分类(value/momentum/volatility/quality)
    frequency: str               # 频率(daily/weekly/monthly)
    dependencies: list = field(default_factory=list)  # 依赖的因子/字段
    version: str = "1.0"         # 版本号
    author: str = ""             # 创建者
    created_at: str = field(default_factory=lambda: datetime.now().isoformat())
 
 
class FactorLibrary:
    """因子库管理器"""
 
    def __init__(self, storage_path: str = "./factors/"):
        self.storage_path = Path(storage_path)
        self.storage_path.mkdir(exist_ok=True)
        self.factors: dict[str, FactorMeta] = {}
        self.computers: dict[str, Callable] = {}
 
    def register(self, meta: FactorMeta, compute_fn: Callable):
        """
        注册因子
 
        参数:
            meta: 因子元数据
            compute_fn: 计算函数,签名: fn(data: dict) -> pd.DataFrame
        """
        if meta.name in self.factors:
            print(f"警告: 因子 {meta.name} 已存在,将覆盖")
        self.factors[meta.name] = meta
        self.computers[meta.name] = compute_fn
        print(f"已注册因子: {meta.name} (v{meta.version})")
 
    def compute_factor(self, name: str, data: dict) -> pd.DataFrame:
        """计算单个因子"""
        if name not in self.computers:
            raise ValueError(f"因子 {name} 未注册")
 
        # 检查依赖是否满足
        meta = self.factors[name]
        missing = [d for d in meta.dependencies if d not in data]
        if missing:
            raise ValueError(f"缺少依赖数据: {missing}")
 
        result = self.computers[name](data)
        return result
 
    def save_factor(self, name: str, df: pd.DataFrame):
        """保存因子到 Parquet 文件"""
        meta = self.factors[name]
        factor_dir = self.storage_path / meta.category
        factor_dir.mkdir(parents=True, exist_ok=True)
        filepath = factor_dir / f"{name}_v{meta.version}.parquet"
        df.to_parquet(filepath, index=False)
        print(f"因子已保存: {filepath}")
 
    def list_factors(self) -> pd.DataFrame:
        """列出所有已注册因子"""
        records = []
        for name, meta in self.factors.items():
            records.append({
                "name": name,
                "category": meta.category,
                "frequency": meta.frequency,
                "version": meta.version,
                "dependencies": ", ".join(meta.dependencies),
                "author": meta.author,
            })
        return pd.DataFrame(records)
 
 
# 使用示例:注册常用因子
lib = FactorLibrary()
 
# 注册动量因子
lib.register(
    meta=FactorMeta(
        name="momentum_20d",
        description="20日动量因子:过去20日收益率",
        category="momentum",
        frequency="daily",
        dependencies=["close"],
    ),
    compute_fn=lambda data: pd.DataFrame({
        "date": data["close"]["date"],
        "stock": data["close"]["stock"],
        "momentum_20d": data["close"].groupby("stock")["close"]
            .pct_change(20),
    }),
)
 
# 注册波动率因子
lib.register(
    meta=FactorMeta(
        name="volatility_20d",
        description="20日年化波动率",
        category="volatility",
        frequency="daily",
        dependencies=["close"],
    ),
    compute_fn=lambda data: pd.DataFrame({
        "date": data["close"]["date"],
        "stock": data["close"]["stock"],
        "volatility_20d": data["close"].groupby("stock")["close"]
            .pct_change().rolling(20).std() * np.sqrt(252),
    }),
)
 
print(lib.list_factors())

1.3 数据管道

数据管道将分散的数据源统一为干净、标准化的研究数据。

┌─────────────────────────────────────────────────────────────────────────────┐
│                        量化数据管道架构                                      │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐                                     │
│  │ 行情数据 │  │ 财务数据 │  │ 另类数据 │    数据源层                         │
│  └────┬────┘  └────┬────┘  └────┬────┘                                     │
│       │            │            │                                           │
│       ▼            ▼            ▼                                           │
│  ┌──────────────────────────────────────┐                                  │
│  │          数据采集层 (Ingestion)        │                                  │
│  │   行情API / 财报爬虫 / 另类数据接口    │                                  │
│  └──────────────────┬───────────────────┘                                  │
│                     │                                                       │
│                     ▼                                                       │
│  ┌──────────────────────────────────────┐                                  │
│  │          数据清洗层 (Cleaning)         │                                  │
│  │  停牌过滤 / 异常值检测 / 复权处理      │                                  │
│  └──────────────────┬───────────────────┘                                  │
│                     │                                                       │
│                     ▼                                                       │
│  ┌──────────────────────────────────────┐                                  │
│  │         数据计算层 (Compute)          │                                  │
│  │  收益率 / 技术指标 / 因子计算          │                                  │
│  └──────────────────┬───────────────────┘                                  │
│                     │                                                       │
│                     ▼                                                       │
│  ┌──────────────────────────────────────┐                                  │
│  │          数据存储层 (Storage)          │                                  │
│  │  Parquet / DuckDB / TimescaleDB      │                                  │
│  └──────────────────┬───────────────────┘                                  │
│                     │                                                       │
│                     ▼                                                       │
│  ┌──────────────────────────────────────┐                                  │
│  │          数据服务层 (Serving)          │                                  │
│  │  研究查询 / 回测接口 / 实盘推送        │                                  │
│  └──────────────────────────────────────┘                                  │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
"""
数据管道实现 — 每日数据更新流程
"""
import pandas as pd
import numpy as np
from pathlib import Path
 
 
class DataPipeline:
    """量化数据管道"""
 
    def __init__(self, raw_dir: str = "./data/raw/",
                 clean_dir: str = "./data/clean/",
                 factor_dir: str = "./data/factors/"):
        self.raw_dir = Path(raw_dir)
        self.clean_dir = Path(clean_dir)
        self.factor_dir = Path(factor_dir)
        # 确保目录存在
        for d in [self.raw_dir, self.clean_dir, self.factor_dir]:
            d.mkdir(parents=True, exist_ok=True)
 
    def run_daily_update(self, trade_date: str):
        """
        执行每日数据更新
 
        步骤:
            1. 采集原始数据
            2. 清洗和标准化
            3. 计算衍生指标
            4. 质量校验
            5. 存储
        """
        print(f"[{trade_date}] 开始每日数据更新")
 
        # Step 1: 采集(此处为示例,实际接入数据源 API)
        raw_quotes = self._fetch_daily_quotes(trade_date)
 
        # Step 2: 清洗
        clean_quotes = self._clean_quotes(raw_quotes)
 
        # Step 3: 计算衍生指标
        enriched = self._compute_derived(clean_quotes)
 
        # Step 4: 质量校验
        issues = self._validate_data(enriched, trade_date)
        if issues:
            print(f"[警告] 数据质量问题: {issues}")
 
        # Step 5: 存储
        self._save_data(enriched, trade_date)
 
        print(f"[{trade_date}] 数据更新完成")
 
    def _fetch_daily_quotes(self, trade_date: str) -> pd.DataFrame:
        """获取日线行情(示例)"""
        # 实际实现中调用行情 API
        return pd.DataFrame()
 
    def _clean_quotes(self, df: pd.DataFrame) -> pd.DataFrame:
        """清洗行情数据"""
        # 去除停牌日(成交量为0且价格不变)
        df = df[~((df["volume"] == 0) & (df["close"] == df["pre_close"]))]
        # 去除异常值(涨跌幅超过20%且非ST)
        df["pct_change"] = df["close"].pct_change()
        df = df[df["pct_change"].between(-0.2, 0.2)]
        return df
 
    def _compute_derived(self, df: pd.DataFrame) -> pd.DataFrame:
        """计算衍生指标"""
        df = df.sort_values(["stock", "date"])
        df["return_1d"] = df.groupby("stock")["close"].pct_change(1)
        df["return_5d"] = df.groupby("stock")["close"].pct_change(5)
        df["turnover_rate"] = df["volume"] / df["shares_outstanding"]
        return df
 
    def _validate_data(self, df: pd.DataFrame, trade_date: str) -> list:
        """数据质量校验(见 1.5 节)"""
        issues = []
        if df.empty:
            issues.append("数据为空")
        if df["stock"].nunique() < 100:
            issues.append(f"股票数量异常: {df['stock'].nunique()}")
        return issues
 
    def _save_data(self, df: pd.DataFrame, trade_date: str):
        """保存清洗后的数据"""
        # 按日期分区存储为 Parquet
        partition_dir = self.clean_dir / f"date={trade_date}"
        partition_dir.mkdir(exist_ok=True)
        df.to_parquet(partition_dir / "daily_quotes.parquet", index=False)

1.4 数据清洗与复权

复权是量化数据处理中最基础也是最容易出错的操作之一。

"""
数据清洗与复权处理
 
复权类型:
    - 前复权 (Forward Adjusted): 以最新价格为基准,历史价格向下调整
      → 保持最新价格不变,历史 K 线连续
      → 适合技术分析和回测(推荐)
 
    - 后复权 (Backward Adjusted): 以 IPO 价格为基准,当前价格向上调整
      → 保持历史价格不变,最新价格可能很高
      → 适合看历史涨幅
 
    - 不复权: 使用真实交易价格
      → 除权除息日会产生价格跳空
      → 适合执行层面分析
"""
import pandas as pd
import numpy as np
 
 
def compute_adj_factor(df: pd.DataFrame) -> pd.DataFrame:
    """
    计算复权因子
 
    输入 DataFrame 需要包含:
        date, stock, open, high, low, close, volume
        dividend: 分红金额(每股)
        split_ratio: 拆股比例(如 10送10 则为 2.0)
 
    输出:
        adj_factor: 复权因子序列
    """
    df = df.sort_values(["stock", "date"]).copy()
 
    # 累计复权因子
    df["adj_factor"] = 1.0
 
    for stock in df["stock"].unique():
        mask = df["stock"] == stock
        stock_df = df[mask].copy()
        stock_df["adj_factor"] = 1.0
 
        # 从最后一天向前累计
        for i in range(len(stock_df) - 2, -1, -1):
            current = stock_df.iloc[i + 1]
            prev = stock_df.iloc[i]
 
            # 复权因子 = (前一日收盘价 / 后一日复权后价格)
            # 简化处理:只考虑分红和拆股
            dividend = prev.get("dividend", 0) or 0
            split = prev.get("split_ratio", 1.0) or 1.0
 
            stock_df.iloc[i, stock_df.columns.get_loc("adj_factor")] = (
                stock_df.iloc[i + 1]["adj_factor"]
                * (current["close"] / (current["close"] * split - dividend))
            )
 
        df.loc[mask, "adj_factor"] = stock_df["adj_factor"].values
 
    return df
 
 
def forward_adjust(df: pd.DataFrame) -> pd.DataFrame:
    """
    前复权处理
 
    前复权公式: adj_price = raw_price * adj_factor
    最新 adj_factor = 1.0,历史 adj_factor > 1.0(历史价格被调低)
    """
    adj_df = compute_adj_factor(df).copy()
 
    # 前复权:价格除以复权因子
    for col in ["open", "high", "low", "close"]:
        adj_df[f"adj_{col}"] = adj_df[col] / adj_df["adj_factor"]
 
    # 成交量乘以复权因子(保持成交额不变)
    adj_df["adj_volume"] = adj_df["volume"] * adj_df["adj_factor"]
 
    # 计算前复权收益率(等价于使用真实价格计算的收益率)
    adj_df["adj_return"] = adj_df.groupby("stock")["adj_close"].pct_change()
 
    return adj_df

1.5 数据质量校验

数据质量是量化系统的生命线,垃圾数据只会产出垃圾策略。

校验维度检查项阈值建议处理方式
完整性每日股票数量> 90% 历史均值告警 + 手动检查
一致性OHLC 关系Low Close High修正或标记
时效性数据更新时间交易日 18:00 前告警
准确性涨跌幅匹配abs(calc - reported) < 0.01%修正
连续性价格跳变单日涨跌幅 < 30%(非ST)标记异常
唯一性去重检查无重复记录去重
"""
数据质量校验框架
"""
import pandas as pd
import numpy as np
from dataclasses import dataclass
 
 
@dataclass
class QualityIssue:
    """数据质量问题"""
    category: str     # 问题类别
    severity: str     # 严重程度(error/warning/info)
    description: str  # 问题描述
    affected_records: int  # 影响记录数
 
 
class DataQualityChecker:
    """数据质量检查器"""
 
    def __init__(self, rules: dict = None):
        self.rules = rules or {}
        self.issues: list = []
 
    def check_completeness(self, df: pd.DataFrame,
                           date_col: str, stock_col: str,
                           expected_stocks: int = 4000) -> list:
        """完整性检查:股票数量是否异常"""
        issues = []
        daily_counts = df.groupby(date_col)[stock_col].nunique()
        mean_count = daily_counts.mean()
        std_count = daily_counts.std()
 
        for date, count in daily_counts.items():
            if count < mean_count - 3 * std_count:
                issues.append(QualityIssue(
                    category="completeness",
                    severity="error",
                    description=f"{date}{count} 只股票,期望约 {int(mean_count)}",
                    affected_records=expected_stocks - count,
                ))
        return issues
 
    def check_ohlc_consistency(self, df: pd.DataFrame) -> list:
        """OHLC 一致性检查"""
        issues = []
        # 检查 Low <= Close <= High
        invalid_high = df[df["close"] > df["high"]]
        invalid_low = df[df["close"] < df["low"]]
        invalid_range = df[df["low"] > df["high"]]
 
        if len(invalid_high) > 0:
            issues.append(QualityIssue(
                category="consistency",
                severity="error",
                description=f"Close > High: {len(invalid_high)} 条记录",
                affected_records=len(invalid_high),
            ))
        if len(invalid_low) > 0:
            issues.append(QualityIssue(
                category="consistency",
                severity="error",
                description=f"Close < Low: {len(invalid_low)} 条记录",
                affected_records=len(invalid_low),
            ))
        return issues
 
    def check_price_outliers(self, df: pd.DataFrame,
                             max_change: float = 0.30) -> list:
        """价格异常检查(排除 ST)"""
        issues = []
        df["pct_change"] = df.groupby("stock")["close"].pct_change()
        outliers = df[abs(df["pct_change"]) > max_change]
 
        if len(outliers) > 0:
            issues.append(QualityIssue(
                category="accuracy",
                severity="warning",
                description=f"涨跌幅超过 {max_change:.0%}: {len(outliers)} 条记录",
                affected_records=len(outliers),
            ))
        return issues
 
    def run_all_checks(self, df: pd.DataFrame) -> list:
        """执行所有检查"""
        all_issues = []
        all_issues.extend(self.check_completeness(df, "date", "stock"))
        all_issues.extend(self.check_ohlc_consistency(df))
        all_issues.extend(self.check_price_outliers(df))
 
        # 汇总报告
        errors = sum(1 for i in all_issues if i.severity == "error")
        warnings = sum(1 for i in all_issues if i.severity == "warning")
        print(f"质量检查完成: {errors} 个错误, {warnings} 个警告")
 
        for issue in all_issues:
            print(f"  [{issue.severity.upper()}] {issue.category}: {issue.description}")
 
        return all_issues

1.6 Parquet 格式

Parquet 是量化数据存储的事实标准格式,理解其设计原理有助于高效使用。

特性说明量化场景优势
列式存储按列而非按行存储数据只读取需要的列,IO 减少 10-100x
压缩编码列级压缩(Snappy/ZSTD/GZIP)量化数据压缩比 5-20x
类型保留保留原始数据类型(int32/float64)避免字符串解析开销
分区裁剪按日期/股票分区,查询自动跳过查询特定日期极快
谓词下推过滤条件下推到存储层减少读取数据量
Schema 演进支持新增列,兼容旧文件因子迭代不影响历史数据
"""
Parquet 文件操作最佳实践
"""
import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
 
# 写入 Parquet — 最佳实践
def write_parquet_optimized(df: pd.DataFrame, filepath: str,
                            partition_cols: list = None):
    """
    优化写入 Parquet 文件
 
    关键优化:
        1. 指定列类型(减少存储空间)
        2. 使用分区(加速查询)
        3. 选择压缩算法(ZSTD 平衡速度和压缩率)
    """
    # 优化数据类型
    dtypes = {
        "date": "datetime64[ns]",
        "stock": "category",        # 分类列用字典编码
        "open": "float32",          # float32 足够精度
        "high": "float32",
        "low": "float32",
        "close": "float32",
        "volume": "int64",
        "turnover": "float64",      # 成交额需要更高精度
    }
    for col, dtype in dtypes.items():
        if col in df.columns:
            df[col] = df[col].astype(dtype)
 
    # 写入(使用 ZSTD 压缩,行组大小 100000 行)
    table = pa.Table.from_pandas(df, preserve_index=False)
 
    pq.write_to_dataset(
        table,
        root_path=filepath,
        partition_cols=partition_cols,
        compression="zstd",
        row_group_size=100_000,
        use_dictionary=True,
    )
 
 
# 读取 Parquet — 带过滤条件
def read_parquet_filtered(filepath: str, date_range: tuple = None,
                          stocks: list = None) -> pd.DataFrame:
    """
    高效读取 Parquet(谓词下推)
 
    只有满足条件的数据会被读入内存,大幅减少 IO
    """
    filters = []
    if date_range:
        filters.append(("date", ">=", date_range[0]))
        filters.append(("date", "<=", date_range[1]))
    if stocks:
        filters.append(("stock", "in", stocks))
 
    return pd.read_parquet(filepath, filters=filters if filters else None)

Part 2:算法执行系统

2.1 TWAP / VWAP 执行算法

TWAP(时间加权平均价格)和 VWAP(成交量加权平均价格)是最基础的执行算法。

"""
TWAP 和 VWAP 执行算法
 
TWAP: 将订单均匀拆分到时间窗口内执行
VWAP: 参考历史成交量分布拆分订单,跟随市场节奏
"""
import numpy as np
import pandas as pd
from dataclasses import dataclass
 
 
@dataclass
class ExecutionOrder:
    """执行订单"""
    stock: str
    side: str           # buy / sell
    total_qty: float    # 总数量
    start_time: str
    end_time: str
    algo: str           # twap / vwap
    limit_price: float  # 限价
    filled_qty: float = 0
    filled_price: float = 0
    child_orders: list = None
 
 
class TWAPExecutor:
    """
    TWAP 执行器
 
    策略: 将大单均匀拆分为 N 个时间片,每个时间片执行等量订单
    优势: 简单、可预测、市场冲击小
    劣势: 不考虑成交量变化,可能在低流动性时段执行
    """
 
    def __init__(self, num_slices: int = 20):
        self.num_slices = num_slices
 
    def generate_schedule(self, order: ExecutionOrder) -> pd.DataFrame:
        """
        生成 TWAP 执行计划
 
        返回: 每个时间片的执行量
        """
        total_minutes = self._time_diff_minutes(order.start_time, order.end_time)
        slice_minutes = total_minutes / self.num_slices
 
        qty_per_slice = order.total_qty / self.num_slices
 
        schedule = []
        for i in range(self.num_slices):
            schedule.append({
                "slice": i + 1,
                "time": self._add_minutes(order.start_time, i * slice_minutes),
                "target_qty": qty_per_slice,
                "cumulative_qty": qty_per_slice * (i + 1),
            })
 
        return pd.DataFrame(schedule)
 
 
class VWAPExecutor:
    """
    VWAP 执行器
 
    策略: 参考历史同时段成交量分布,在高成交量的时段多执行
    优势: 更好地融入市场,降低市场冲击
    劣势: 需要历史成交量分布数据
    """
 
    def __init__(self, volume_profile: pd.Series):
        """
        参数:
            volume_profile: 历史成交量分布(按时段归一化)
                           index: 时段 (如 "09:30", "10:00")
                           value: 该时段成交量占比
        """
        self.volume_profile = volume_profile / volume_profile.sum()
 
    def generate_schedule(self, order: ExecutionOrder,
                          current_time: str = None) -> pd.DataFrame:
        """生成 VWAP 执行计划"""
        if current_time is None:
            current_time = order.start_time
 
        # 剩余时段的成交量分布
        remaining_profile = self.volume_profile[self.volume_profile.index >= current_time]
        remaining_profile = remaining_profile / remaining_profile.sum()
 
        schedule = []
        for time_slot, ratio in remaining_profile.items():
            schedule.append({
                "time": time_slot,
                "volume_ratio": ratio,
                "target_qty": order.total_qty * ratio,
            })
 
        df = pd.DataFrame(schedule)
        df["cumulative_qty"] = df["target_qty"].cumsum()
        return df

2.2 Almgren-Chriss 最优执行

Almgren-Chriss 模型是算法执行的理论基石,在市场冲击和时机风险之间寻求最优平衡。

核心思想:
    - 快速执行 → 高市场冲击,低时机风险
    - 缓慢执行 → 低市场冲击,高时机风险
    - 最优策略:在两者之间找到平衡

目标函数:
    min  E[C] + λ * Var[C]

    其中:
        C: 总执行成本 = 市场冲击 + 时机成本
        λ: 风险厌恶参数
        E[C]: 期望执行成本(主要由永久冲击决定)
        Var[C]: 执行成本方差(主要由价格波动决定)
"""
Almgren-Chriss 最优执行框架
 
模型参数:
    σ: 资产日波动率
    η: 临时冲击系数(每股交易对价格的即时影响)
    γ: 永久冲击系数(交易对基本面价格的影响)
    T: 总执行时间(天)
    N: 时间步数
    X: 总需执行股数
    λ: 风险厌恶参数
"""
import numpy as np
import matplotlib.pyplot as plt
 
 
class AlmgrenChriss:
    """Almgren-Chriss 最优执行模型"""
 
    def __init__(self, sigma: float, eta: float, gamma: float,
                 T: float = 1.0, N: int = 20, lambd: float = 1e-6):
        """
        参数:
            sigma: 日波动率 (如 0.02 = 2%)
            eta: 临时冲击系数
            gamma: 永久冲击系数
            T: 总时间 (天)
            N: 离散时间步数
            lambd: 风险厌恶参数
        """
        self.sigma = sigma
        self.eta = eta
        self.gamma = gamma
        self.T = T
        self.N = N
        self.lambd = lambd
        self.dt = T / N  # 时间步长
 
        # 计算最优策略参数
        self.kappa = np.sqrt(
            self.sigma ** 2 / (2 * self.lambd * self.eta)
        )  # 风险中性速率
 
        # 计算交易轨迹
        self._compute_optimal_trajectory()
 
    def _compute_optimal_trajectory(self):
        """计算最优交易轨迹"""
        # 时间网格
        self.tau = np.array([
            self.T - i * self.dt for i in range(self.N + 1)
        ])
 
        # 最优轨迹: n_j = X * (sinh(kappa * tau_j) / sinh(kappa * T))
        self.optimal_participation = (
            np.sinh(self.kappa * self.tau) / np.sinh(self.kappa * self.T)
        )
 
    def get_execution_schedule(self, total_shares: float) -> pd.DataFrame:
        """
        获取执行计划
 
        返回:
            DataFrame: [step, time, remaining_shares, trade_size, participation_rate]
        """
        remaining = total_shares * self.optimal_participation
        trade_size = -np.diff(remaining)
        times = np.linspace(0, self.T, self.N + 1)
 
        schedule = pd.DataFrame({
            "step": range(self.N + 1),
            "time": times,
            "remaining_shares": remaining,
            "trade_size": np.append(trade_size, 0),
            "participation_rate": self.optimal_participation,
        })
 
        return schedule
 
    def estimate_costs(self, total_shares: float,
                       daily_volume: float) -> dict:
        """
        估计执行成本
 
        返回:
            临时冲击成本、永久冲击成本、总成本、有效价差
        """
        avg_daily_volume = daily_volume / 252  # 日均成交量
 
        # 永久冲击成本 ≈ γ * X² / (2 * T)
        permanent_impact = self.gamma * total_shares ** 2 / (2 * self.T)
 
        # 临时冲击成本 ≈ η * Σ n_j²
        schedule = self.get_execution_schedule(total_shares)
        temp_impact = self.eta * np.sum(schedule["trade_size"] ** 2)
 
        # 时机风险 ≈ σ * sqrt(T/3) * X(近似)
        timing_risk = self.sigma * np.sqrt(self.T / 3) * total_shares
 
        return {
            "permanent_impact_bps": permanent_impact / total_shares * 10000,
            "temporary_impact_bps": temp_impact / total_shares * 10000,
            "timing_risk_bps": timing_risk / total_shares * 10000,
            "total_expected_cost_bps": (
                permanent_impact + temp_impact
            ) / total_shares * 10000,
        }
 
 
# 使用示例
ac = AlmgrenChriss(
    sigma=0.02,       # 2% 日波动率
    eta=0.05,         # 临时冲击系数
    gamma=0.02,       # 永久冲击系数
    T=1.0,            # 1 天内执行
    N=20,             # 20 个时间步
    lambd=1e-6,       # 风险厌恶
)
 
schedule = ac.get_execution_schedule(total_shares=100000)
costs = ac.estimate_costs(total_shares=100000, daily_volume=50000000)
 
print(f"预期总执行成本: {costs['total_expected_cost_bps']:.2f} bps")
print(f"永久冲击: {costs['permanent_impact_bps']:.2f} bps")
print(f"临时冲击: {costs['temporary_impact_bps']:.2f} bps")

2.3 SOR 与 POV

SOR(Smart Order Routing, 智能订单路由) 自动选择最优交易所和流动性池执行订单。

POV(Percentage of Volume, 参与率控制) 限制订单执行速度不超过市场成交量的某个比例。

"""
SOR / POV 执行策略
 
SOR 核心逻辑:
    1. 查询各交易所的报价和深度
    2. 按价格最优 → 费用最低 → 延迟最短的优先级排序
    3. 拆分订单到多个交易所执行
 
POV 核心逻辑:
    1. 实时监控市场成交量
    2. 控制自身执行量 ≤ POV% × 市场成交量
    3. 市场流动性好时多执行,流动性差时少执行
"""
import numpy as np
 
 
def smart_order_route(order_qty: float, venue_quotes: list) -> list:
    """
    智能订单路由
 
    参数:
        order_qty: 总订单量
        venue_quotes: 交易所报价列表
            [{"venue": "SSE", "price": 10.01, "available_qty": 5000, "fee_rate": 0.0001}, ...]
 
    返回:
        各交易所的子订单列表
    """
    # 按价格排序(买单价从低到高,卖单价从高到低)
    sorted_quotes = sorted(venue_quotes, key=lambda x: x["price"])
 
    remaining = order_qty
    allocations = []
 
    for quote in sorted_quotes:
        if remaining <= 0:
            break
 
        # 在该交易所执行的量 = min(剩余量, 可用量)
        exec_qty = min(remaining, quote["available_qty"])
        remaining -= exec_qty
 
        allocations.append({
            "venue": quote["venue"],
            "price": quote["price"],
            "qty": exec_qty,
            "fee": exec_qty * quote["price"] * quote["fee_rate"],
        })
 
    vwap = sum(a["qty"] * a["price"] for a in allocations) / sum(
        a["qty"] for a in allocations
    ) if allocations else 0
 
    return allocations
 
 
def pov_execute(target_qty: float, pov_rate: float,
                market_volume_series: np.ndarray) -> list:
    """
    POV 参与率执行
 
    参数:
        target_qty: 目标执行总量
        pov_rate: 参与率 (如 0.1 = 市场成交量的 10%)
        market_volume_series: 预期/实时市场成交量序列
 
    返回:
        每个时间步的目标执行量
    """
    exec_schedule = []
    remaining = target_qty
 
    for market_vol in market_volume_series:
        if remaining <= 0:
            exec_schedule.append(0)
            continue
 
        # POV 限制:执行量不超过市场成交量的 pov_rate
        max_exec = market_vol * pov_rate
        exec_qty = min(remaining, max_exec)
        exec_schedule.append(exec_qty)
        remaining -= exec_qty
 
    completion_rate = (target_qty - remaining) / target_qty
    print(f"POV 执行完成率: {completion_rate:.1%}")
 
    return exec_schedule

2.4 TCA 交易成本分析

TCA(Transaction Cost Analysis)量化评估执行质量,识别成本来源并优化。

"""
TCA 交易成本分析框架
 
成本分解:
    总成本 = 显性成本 + 隐性成本
           = (佣金 + 印花税 + 过户费) + (买卖价差 + 市场冲击 + 时机成本)
 
    买卖价差 (Spread): 买一卖一之间的差价
    市场冲击 (Impact): 大单推动价格的不利变动
    时机成本 (Timing): 等待执行期间价格的不利变动
"""
import pandas as pd
import numpy as np
 
 
class TCAnalyzer:
    """交易成本分析器"""
 
    def __init__(self):
        self.trades = []
 
    def add_trade(self, trade: dict):
        """记录一笔成交"""
        self.trades.append(trade)
 
    def analyze(self) -> dict:
        """
        执行成本分析
 
        输入 trades 中每笔记录需包含:
            stock, side, qty, price, benchmark_price,
            mid_price, arrival_price, decision_price
        """
        df = pd.DataFrame(self.trades)
        if df.empty:
            return {}
 
        total_qty = df["qty"].sum()
        vwap = (df["qty"] * df["price"]).sum() / total_qty
 
        # 1. 显性成本(佣金 + 税费)
        commission_rate = 0.0003  # 佣金万三
        stamp_tax_rate = 0.001    # 印花税千一(仅卖出)
        explicit_cost = (
            total_qty * vwap * commission_rate
            + df[df["side"] == "sell"]["qty"].sum() * vwap * stamp_tax_rate
        )
 
        # 2. 买卖价差成本
        spread_cost = abs(df["price"] - df["mid_price"]).sum() * df["qty"]
 
        # 3. 市场冲击成本(相对于 VWAP 基准)
        # 买入冲击 = (执行价 - 基准价) * 量
        impact_cost = 0
        for _, row in df.iterrows():
            if row["side"] == "buy":
                impact_cost += (row["price"] - row["benchmark_price"]) * row["qty"]
            else:
                impact_cost += (row["benchmark_price"] - row["price"]) * row["qty"]
 
        # 4. 时机成本(相对于决策价格)
        timing_cost = 0
        for _, row in df.iterrows():
            if row["side"] == "buy":
                timing_cost += (row["price"] - row["arrival_price"]) * row["qty"]
            else:
                timing_cost += (row["arrival_price"] - row["price"]) * row["qty"]
 
        # 5. 实现差(Implementation Shortfall)
        # IS = (决策价 - 执行价) * 方向 * 数量
        is_total = 0
        for _, row in df.iterrows():
            direction = 1 if row["side"] == "buy" else -1
            is_total += direction * (row["decision_price"] - row["price"]) * row["qty"]
 
        # 汇总(转换为 bps)
        notional = total_qty * vwap
 
        report = {
            "total_notional": notional,
            "vwap": vwap,
            "total_shares": total_qty,
            "costs_bps": {
                "explicit": explicit_cost / notional * 10000,
                "spread": spread_cost / notional * 10000,
                "market_impact": impact_cost / notional * 10000,
                "timing": timing_cost / notional * 10000,
                "implementation_shortfall": is_total / notional * 10000,
                "total": (explicit_cost + spread_cost + impact_cost
                         + timing_cost) / notional * 10000,
            },
        }
 
        return report
 
    def print_report(self, report: dict):
        """打印成本分析报告"""
        print("=" * 60)
        print("交易成本分析报告 (TCA)")
        print("=" * 60)
        print(f"总成交额: {report['total_notional']:,.0f}")
        print(f"VWAP:     {report['vwap']:.4f}")
        print(f"总股数:   {report['total_shares']:,.0f}")
        print("-" * 40)
        print(f"{'成本类别':<25} {'金额 (bps)':>12}")
        print("-" * 40)
        for cost_name, cost_bps in report["costs_bps"].items():
            label_map = {
                "explicit": "显性成本(佣金+税费)",
                "spread": "买卖价差成本",
                "market_impact": "市场冲击成本",
                "timing": "时机成本",
                "implementation_shortfall": "实现差 (IS)",
                "total": "总成本",
            }
            label = label_map.get(cost_name, cost_name)
            marker = ">>>" if cost_name == "total" else "   "
            print(f"{marker} {label:<22} {cost_bps:>10.2f} bps")
        print("=" * 60)

2.5 执行系统架构

┌─────────────────────────────────────────────────────────────────────────────┐
│                        算法执行系统架构                                       │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  ┌─────────────┐     ┌─────────────┐     ┌─────────────┐                  │
│  │  策略信号层   │     │  订单管理层   │     │  执行引擎层   │                  │
│  │             │     │             │     │             │                  │
│  │ Alpha 信号   │ ─→ │ 订单拆分     │ ─→ │ TWAP/VWAP   │                  │
│  │ 目标仓位     │     │ 风控检查     │     │ Almgren-C  │                  │
│  │ 风险预算     │     │ 限额管理     │     │ POV/SOR    │                  │
│  └─────────────┘     └──────┬──────┘     └──────┬──────┘                  │
│                             │                   │                          │
│                             ▼                   ▼                          │
│                      ┌──────────────────────────────┐                      │
│                      │        市场数据层             │                      │
│                      │  实时行情 / 订单簿 / 成交      │                      │
│                      └──────────────┬───────────────┘                      │
│                                     │                                      │
│                                     ▼                                      │
│                      ┌──────────────────────────────┐                      │
│                      │        交易所接入层           │                      │
│                      │  SSE / SZSE / CFFEX / 港股    │                      │
│                      └──────────────┬───────────────┘                      │
│                                     │                                      │
│                                     ▼                                      │
│                      ┌──────────────────────────────┐                      │
│                      │        监控与分析层           │                      │
│                      │  TCA / 执行报告 / 异常告警    │                      │
│                      └──────────────────────────────┘                      │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

核心总结

┌──────────────────────────────────────────────────────────────────┐
│                数据基础设施与算法执行核心要点                        │
├──────────────────────────────────────────────────────────────────┤
│                                                                  │
│  1. 存储选型:DuckDB + Parquet 是中小团队最优解                   │
│     零运维、列式存储、压缩高效、与 pandas 无缝集成                  │
│                                                                  │
│  2. 因子库是核心资产                                              │
│     注册 → 计算 → 存储 → 版本管理 → 血缘追踪                      │
│                                                                  │
│  3. 数据管道五步法                                                │
│     采集 → 清洗 → 计算 → 校验 → 存储                             │
│                                                                  │
│  4. 复权用前复权,执行用不复权                                     │
│     前复权适合回测,不复权适合实盘执行                              │
│                                                                  │
│  5. 执行算法:TWAP 简单,VWAP 跟随,AC 最优                       │
│     小单用 TWAP,大单用 Almgren-Chriss                            │
│                                                                  │
│  6. TCA 是执行质量的唯一度量标准                                    │
│     将成本分解为价差/冲击/时机,逐项优化                            │
│                                                                  │
└──────────────────────────────────────────────────────────────────┘