时序数据划分

1. 量化时序数据的特殊性

1.1 因果性约束

核心问题

在量化投资中,数据划分必须严格遵守因果性(Causality)原则:只能使用历史数据预测未来,绝不能使用未来信息。

数学表达

对于时间点 的预测

其中 表示 时刻及之前的所有特征,不允许包含

违反因果性的典型错误

# 错误示例1:全局标准化
from sklearn.preprocessing import StandardScaler
 
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)  # 使用全部数据计算均值和方差
 
# 这导致 t 时刻的特征使用了 t+1, t+2, ... 的统计信息
# 问题:scaler.fit使用全部数据的mean和std
# 错误示例2:PCA降维
from sklearn.decomposition import PCA
 
pca = PCA(n_components=5)
X_pca = pca.fit_transform(X)  # 使用全部数据计算主成分
 
# 问题:PCA的协方差矩阵基于全部数据

正确做法

# 正确示例1:滚动标准化
from sklearn.preprocessing import StandardScaler
 
def rolling_standardize(X, window=252):
    X_scaled = np.zeros_like(X)
    for i in range(window, len(X)):
        scaler = StandardScaler()
        X_scaled[i] = scaler.fit_transform(X[i-window:i])[-1]
    return X_scaled

1.2 数据泄露的检测方法

前向泄露

特征计算中使用了未来数据:

检测方法

def detect_lookahead_leakage(X, window=5):
    """
    检测特征是否使用了未来信息
 
    方法:检查特征与未来目标的相关性
    """
    correlations = []
    for lag in range(1, window + 1):
        # 计算当前特征与未来lag期目标的相关性
        corr = np.corrcoef(X[:-lag], X[lag:])[0, 1]
        correlations.append(corr)
 
    # 如果相关性强,可能存在未来信息泄露
    if max(correlations) > 0.1:
        print(f"警告:检测到潜在的未来信息泄露,最大相关性: {max(correlations):.4f}")
 
    return correlations

横截面泄露

使用了同截面其他股票的信息:

检测方法

def detect_cross_section_leakage(X, stock_ids, n_stocks=100):
    """
    检测是否存在横截面信息泄露
 
    方法:检查股票间特征的瞬时相关性
    """
    t_corr = []
    for t in range(X.shape[1]):
        # 计算t时刻所有股票间的相关性
        Xt = X[:, t, :]
        corr_matrix = np.corrcoef(Xt)
 
        # 平均相关性
        avg_corr = (np.sum(np.abs(corr_matrix)) - n_stocks) / (n_stocks * (n_stocks - 1))
        t_corr.append(avg_corr)
 
    return t_corr

2. 时间序列交叉验证方法

2.1 传统K-Fold的局限性

为什么不能用随机K-Fold?

# 错误示例:使用随机K-Fold
from sklearn.model_selection import KFold
 
kf = KFold(n_splits=5, shuffle=True)
for train_idx, val_idx in kf.split(X):
    # 问题:训练集和验证集在时间上交错
    # 例如:train包含2023年数据,val包含2022年数据
    X_train, X_val = X[train_idx], X[val_idx]

问题分析

  1. 未来信息泄露:验证集可能包含训练集之前的数据
  2. 回测失真:不符合实际投资场景
  3. 评估偏差:虚高模型性能

示例说明

时间轴:2018 | 2019 | 2020 | 2021 | 2022 | 2023
随机K-Fold可能产生:
Train: 2018, 2020, 2023
Val:   2019, 2021, 2022

这在实际中不可能!

2.2 时间序列交叉验证(TimeSeriesSplit)

基本原理

from sklearn.model_selection import TimeSeriesSplit
 
tscv = TimeSeriesSplit(n_splits=5)
 
for train_idx, val_idx in tscv.split(X):
    # 训练集:[t_start, t_train_end]
    # 验证集:[t_train_end+1, t_val_end]
    X_train, X_val = X[train_idx], X[val_idx]

可视化示意

Fold 1: |=== Train ===|== Val ==|.........|
Fold 2: |==== Train ====|=== Val ===|.....|
Fold 3: |===== Train =====|==== Val ====|..|
Fold 4: |====== Train ======|===== Val =====|
Fold 5: |======= Train =======|====== Val ==|

代码实现

import numpy as np
import pandas as pd
from sklearn.model_selection import TimeSeriesSplit
import lightgbm as lgb
 
def time_series_cv(X, y, params, n_splits=5):
    """
    时间序列交叉验证
 
    参数:
        X: 特征矩阵,shape=[n_samples, n_features]
        y: 目标变量,shape=[n_samples]
        params: LightGBM参数
        n_splits: 折数
 
    返回:
        val_scores: 每折的验证集得分
    """
    tscv = TimeSeriesSplit(n_splits=n_splits)
    val_scores = []
 
    for fold, (train_idx, val_idx) in enumerate(tscv.split(X)):
        print(f"Fold {fold + 1}/{n_splits}")
        print(f"  Train: {train_idx[0]} - {train_idx[-1]}")
        print(f"  Val:   {val_idx[0]} - {val_idx[-1]}")
 
        # 划分数据
        X_train, X_val = X[train_idx], X[val_idx]
        y_train, y_val = y[train_idx], y[val_idx]
 
        # 创建数据集
        train_data = lgb.Dataset(X_train, label=y_train)
        val_data = lgb.Dataset(X_val, label=y_val, reference=train_data)
 
        # 训练模型
        model = lgb.train(
            params,
            train_data,
            num_boost_round=1000,
            valid_sets=[train_data, val_data],
            callbacks=[
                lgb.early_stopping(stopping_rounds=50, verbose=False),
                lgb.log_evaluation(period=100)
            ]
        )
 
        # 评估
        y_pred = model.predict(X_val)
        score = np.corrcoef(y_pred, y_val)[0, 1]
        val_scores.append(score)
        print(f"  Validation IC: {score:.4f}")
 
    return val_scores

2.3 滚动窗口交叉验证(Rolling Window)

适用场景

当数据分布随时间剧烈变化时,使用固定窗口大小滚动训练。

方法对比

扩展窗口(Expanding Window):
Fold 1: |=== Train ===|== Val ==|.........|
Fold 2: |==== Train ====|=== Val ===|.....|
Fold 3: |===== Train =====|==== Val ====|..|

滚动窗口(Rolling Window):
Fold 1: |=== Train ===|== Val ==|.........|
Fold 2: |...=== Train ===|== Val ==|.....|
Fold 3: |.....=== Train ===|== Val ==|..|

代码实现

def rolling_window_cv(X, y, params, train_size=252, val_size=21, step=21):
    """
    滚动窗口交叉验证
 
    参数:
        train_size: 训练窗口大小(例如252个交易日≈1年)
        val_size: 验证窗口大小(例如21个交易日≈1个月)
        step: 滚动步长
 
    返回:
        val_scores: 每折的验证集得分
        models: 训练的模型列表
    """
    val_scores = []
    models = []
 
    n_samples = len(X)
    start_idx = train_size
 
    fold = 0
    while start_idx + val_size <= n_samples:
        print(f"Fold {fold + 1}")
 
        # 划分数据
        train_start = start_idx - train_size
        train_end = start_idx
        val_start = start_idx
        val_end = start_idx + val_size
 
        print(f"  Train: {train_start} - {train_end}")
        print(f"  Val:   {val_start} - {val_end}")
 
        X_train, X_val = X[train_start:train_end], X[val_start:val_end]
        y_train, y_val = y[train_start:train_end], y[val_start:val_end]
 
        # 训练模型
        train_data = lgb.Dataset(X_train, label=y_train)
        val_data = lgb.Dataset(X_val, label=y_val, reference=train_data)
 
        model = lgb.train(
            params,
            train_data,
            num_boost_round=1000,
            valid_sets=[train_data, val_data],
            callbacks=[
                lgb.early_stopping(stopping_rounds=50, verbose=False),
                lgb.log_evaluation(period=100)
            ]
        )
 
        # 评估
        y_pred = model.predict(X_val)
        score = np.corrcoef(y_pred, y_val)[0, 1]
        val_scores.append(score)
        models.append(model)
 
        print(f"  Validation IC: {score:.4f}")
 
        # 滚动窗口
        start_idx += step
        fold += 1
 
    return val_scores, models

在量化中的应用

# 示例:使用滚动窗口验证量化因子
params = {
    'objective': 'regression',
    'metric': 'rmse',
    'num_leaves': 31,
    'learning_rate': 0.05,
}
 
# 训练窗口:1年(252个交易日)
# 验证窗口:1个月(21个交易日)
# 滚动步长:1个月
val_scores, models = rolling_window_cv(
    X, y, params,
    train_size=252,
    val_size=21,
    step=21
)
 
print(f"平均IC: {np.mean(val_scores):.4f}")
print(f"IC标准差: {np.std(val_scores):.4f}")

2.4 步进验证(Walk-Forward Validation)

核心思想

模拟实际交易场景,每次验证后,将验证集加入训练集,向前滚动。

与滚动窗口的区别

  • 滚动窗口:固定训练窗口大小
  • 步进验证:训练窗口逐步扩大

代码实现

def walk_forward_validation(X, y, params, initial_train_size=252, val_size=21, step=21):
    """
    步进验证
 
    参数:
        initial_train_size: 初始训练窗口大小
        val_size: 验证窗口大小
        step: 前进步长
 
    返回:
        predictions: 所有的预测结果
        val_scores: 每折的验证集得分
    """
    predictions = []
    val_scores = []
    models = []
 
    n_samples = len(X)
    start_idx = initial_train_size
 
    fold = 0
    while start_idx + val_size <= n_samples:
        print(f"Fold {fold + 1}")
 
        # 划分数据
        train_end = start_idx
        val_start = start_idx
        val_end = start_idx + val_size
 
        print(f"  Train: 0 - {train_end}")
        print(f"  Val:   {val_start} - {val_end}")
 
        X_train, X_val = X[:train_end], X[val_start:val_end]
        y_train, y_val = y[:train_end], y[val_start:val_end]
 
        # 训练模型
        train_data = lgb.Dataset(X_train, label=y_train)
        val_data = lgb.Dataset(X_val, label=y_val, reference=train_data)
 
        model = lgb.train(
            params,
            train_data,
            num_boost_round=1000,
            valid_sets=[train_data, val_data],
            callbacks=[
                lgb.early_stopping(stopping_rounds=50, verbose=False),
                lgb.log_evaluation(period=100)
            ]
        )
 
        # 预测
        y_pred = model.predict(X_val)
        predictions.extend(y_pred)
 
        # 评估
        score = np.corrcoef(y_pred, y_val)[0, 1]
        val_scores.append(score)
        models.append(model)
 
        print(f"  Validation IC: {score:.4f}")
 
        # 向前滚动
        start_idx += step
        fold += 1
 
    return predictions, val_scores, models

3. Purging 和 Embargo (进阶)

3.1 为什么还需要额外保护?

问题场景:

假设:

  • 训练集最后一天: 2021-12-31
  • 验证集第一天: 2022-01-01
  • 你的标签是: 未来5日收益

这意味着: 2021-12-31 的标签 = 2022-01-05 的信息

2021-12-31 虽然在训练集,但它的标签包含了验证集时间段的信息!

3.2 Purging (清除)

原理:

删除训练集末尾 N 天,N = 标签预测周期

def train_val_test_split_with_purging(X, y, dates, 
                                       train_ratio=0.6, 
                                       val_ratio=0.2, 
                                       test_ratio=0.2,
                                       horizon=5):
    """
    带Purging的数据划分
    
    参数:
        X, y: 特征和标签
        dates: 日期数组,shape=[n_samples]
        train_ratio, val_ratio, test_ratio: 划分比例
        horizon: 预测周期(标签使用的未来数据天数)
        
    返回:
        (X_train, X_val, X_test), (y_train, y_val, y_test)
    """
    assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-6
    
    # 获取唯一日期并排序
    unique_dates = np.unique(dates)
    n_dates = len(unique_dates)
    
    # 计算划分点
    train_end_idx = int(n_dates * train_ratio)
    val_end_idx = int(n_dates * (train_ratio + val_ratio))
    
    train_end_date = unique_dates[train_end_idx]
    val_end_date = unique_dates[val_end_idx]
    
    print(f"原始划分:")
    print(f"  训练集:  {unique_dates[0]} ~ {train_end_date}")
    print(f"  验证集:  {unique_dates[train_end_idx+1]} ~ {val_end_date}")
    print(f"  测试集:  {unique_dates[val_end_idx+1]} ~ {unique_dates[-1]}")
    
    # Purging: 删除训练集末尾horizon天
    purge_start_date = unique_dates[train_end_idx - horizon]
    
    # 生成掩码
    train_mask = dates <= purge_start_date
    valid_mask = (dates > train_end_date) & (dates <= val_end_date)
    test_mask = dates > val_end_date
    
    print(f"\nPurging (删除训练集末尾{horizon}天):")
    print(f"  原训练集末尾: {train_end_date}")
    print(f"  新训练集末尾: {purge_start_date}")
    
    # 划分数据
    X_train, y_train = X[train_mask], y[train_mask]
    X_val, y_val = X[valid_mask], y[valid_mask]
    X_test, y_test = X[test_mask], y[test_mask]
    
    print(f"\n最终划分:")
    print(f"  训练集: {len(X_train)} 样本")
    print(f"  验证集: {len(X_val)} 样本")
    print(f"  测试集: {len(X_test)} 样本")
    
    return (X_train, X_val, X_test), (y_train, y_val, y_test)
 
# 使用示例
dates = pd.date_range('2020-01-01', '2023-12-31', freq='D')
X = np.random.randn(len(dates), 10)
y = np.random.randn(len(dates))
 
(X_train, X_val, X_test), (y_train, y_val, y_test) = train_val_test_split_with_purging(
    X, y, dates,
    train_ratio=0.6,
    val_ratio=0.2,
    test_ratio=0.2,
    horizon=5
)

3.3 Embargo (禁运)

原理:

验证集开头额外空出几天作为缓冲

def train_val_test_split_with_embargo(X, y, dates,
                                      train_ratio=0.6,
                                      val_ratio=0.2,
                                      test_ratio=0.2,
                                      horizon=5,
                                      embargo=3):
    """
    带Embargo的数据划分
    
    参数:
        X, y: 特征和标签
        dates: 日期数组
        train_ratio, val_ratio, test_ratio: 划分比例
        horizon: 预测周期
        embargo: 禁运天数
        
    返回:
        (X_train, X_val, X_test), (y_train, y_val, y_test)
    """
    assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-6
    
    # 获取唯一日期并排序
    unique_dates = np.unique(dates)
    n_dates = len(unique_dates)
    
    # 计算划分点
    train_end_idx = int(n_dates * train_ratio)
    val_end_idx = int(n_dates * (train_ratio + val_ratio))
    
    train_end_date = unique_dates[train_end_idx]
    val_end_date = unique_dates[val_end_idx]
    
    # Embargo: 验证集开头额外空出embargo天
    embargo_start_date = unique_dates[train_end_idx + embargo]
    
    # 生成掩码
    train_mask = dates <= train_end_date
    valid_mask = (dates > embargo_start_date) & (dates <= val_end_date)
    test_mask = dates > val_end_date
    
    print(f"\nEmbargo (验证集开头空出{embargo}天):")
    print(f"  原验证集开头: {unique_dates[train_end_idx+1]}")
    print(f"  新验证集开头: {embargo_start_date}")
    
    # 划分数据
    X_train, y_train = X[train_mask], y[train_mask]
    X_val, y_val = X[valid_mask], y[valid_mask]
    X_test, y_test = X[test_mask], y[test_mask]
    
    print(f"\n最终划分:")
    print(f"  训练集: {len(X_train)} 样本")
    print(f"  验证集: {len(X_val)} 样本 (删除了{embargo}天)")
    print(f"  测试集: {len(X_test)} 样本")
    
    return (X_train, X_val, X_test), (y_train, y_val, y_test)
 
# 使用示例
(X_train, X_val, X_test), (y_train, y_val, y_test) = train_val_test_split_with_embargo(
    X, y, dates,
    train_ratio=0.6,
    val_ratio=0.2,
    test_ratio=0.2,
    horizon=5,
    embargo=3
)

3.4 完整的 Purging + Embargo

def train_val_test_split_full(X, y, dates,
                               train_ratio=0.6,
                               val_ratio=0.2,
                               test_ratio=0.2,
                               horizon=5,
                               embargo=3):
    """
    完整的Purging + Embargo数据划分
    
    时间线:
    │  训练集  │ Purge │ Embargo │   验证集   │
    └──────────┘   ↓       ↓
               删除这段   缓冲区
    """
    assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-6
    
    # 获取唯一日期并排序
    unique_dates = np.unique(dates)
    n_dates = len(unique_dates)
    
    # 计算划分点
    train_end_idx = int(n_dates * train_ratio)
    val_end_idx = int(n_dates * (train_ratio + val_ratio))
    
    # Purging: 删除训练集末尾horizon天
    purge_start_date = unique_dates[train_end_idx - horizon]
    
    # Embargo: 验证集开头空出embargo天
    embargo_start_date = unique_dates[train_end_idx + embargo]
    val_end_date = unique_dates[val_end_idx]
    
    # 生成掩码
    train_mask = dates <= purge_start_date
    valid_mask = (dates > embargo_start_date) & (dates <= val_end_date)
    test_mask = dates > val_end_date
    
    # 划分数据
    X_train, y_train = X[train_mask], y[train_mask]
    X_val, y_val = X[valid_mask], y[valid_mask]
    X_test, y_test = X[test_mask], y[test_mask]
    
    print(f"完整划分 (Purging + Embargo):")
    print(f"  训练集: {len(X_train)} 样本 (删除末尾{horizon}天)")
    print(f"  验证集: {len(X_val)} 样本 (删除开头{embargo}天)")
    print(f"  测试集: {len(X_test)} 样本")
    
    return (X_train, X_val, X_test), (y_train, y_val, y_test)
 
# 使用示例
(X_train, X_val, X_test), (y_train, y_val, y_test) = train_val_test_split_full(
    X, y, dates,
    train_ratio=0.6,
    val_ratio=0.2,
    test_ratio=0.2,
    horizon=5,
    embargo=3
)

3.5 Walk-Forward 验证 (滚动验证)

更真实的验证方式:模拟实际交易场景

def walk_forward_validation_with_purging(X, y, dates, model_class, params,
                                          initial_train_size=252,
                                          val_size=21,
                                          step=21,
                                          horizon=5):
    """
    带Purging的Walk-Forward验证
    
    模拟实际交易场景:
    Window 1: Train(2020) → Test(2021 Q1)
    Window 2: Train(2020~2021 Q1) → Test(2021 Q2)
    Window 3: Train(2020~2021 Q2) → Test(2021 Q3)
    ...
    """
    unique_dates = np.unique(dates)
    n_dates = len(unique_dates)
    
    val_scores = []
    models = []
    windows = []
    
    start_idx = initial_train_size
    
    fold = 0
    while start_idx + val_size + horizon <= n_dates:
        print(f"\n{'='*60}")
        print(f"Window {fold + 1}")
        print(f"{'='*60}")
        
        # 训练集: 删除末尾horizon天 (Purging)
        train_start = 0
        train_end = start_idx - horizon
        train_end_date = unique_dates[train_end]
        
        # 验证集: 从start_idx开始
        val_start = start_idx
        val_end = start_idx + val_size
        val_start_date = unique_dates[val_start]
        val_end_date = unique_dates[val_end]
        
        print(f"训练集: {unique_dates[0]} ~ {train_end_date}")
        print(f"验证集: {val_start_date} ~ {val_end_date}")
        print(f"Purging: 删除 {unique_dates[train_end+1]} ~ {unique_dates[start_idx-1]} ({horizon}天)")
        
        # 划分数据
        train_mask = (dates >= unique_dates[0]) & (dates <= train_end_date)
        val_mask = (dates >= val_start_date) & (dates <= val_end_date)
        
        X_train, y_train = X[train_mask], y[train_mask]
        X_val, y_val = X[val_mask], y[val_mask]
        
        print(f"训练样本数: {len(X_train)}, 验证样本数: {len(X_val)}")
        
        # 训练模型
        model = model_class(**params)
        model.fit(X_train, y_train)
        
        # 预测
        y_pred = model.predict(X_val)
        
        # 评估
        from scipy.stats import pearsonr
        ic = pearsonr(y_pred, y_val)[0]
        val_scores.append(ic)
        models.append(model)
        
        windows.append({
            'train_start': unique_dates[0],
            'train_end': train_end_date,
            'val_start': val_start_date,
            'val_end': val_end_date,
            'ic': ic
        })
        
        print(f"验证IC: {ic:.4f}")
        
        # 向前滚动
        start_idx += step
        fold += 1
    
    # 统计
    print(f"\n{'='*60}")
    print("Walk-Forward验证统计")
    print(f"{'='*60}")
    print(f"窗口数: {len(windows)}")
    print(f"平均IC: {np.mean(val_scores):.4f}")
    print(f"IC标准差: {np.std(val_scores):.4f}")
    print(f"ICIR: {np.mean(val_scores) / np.std(val_scores):.4f}")
    print(f"IC胜率: {(np.array(val_scores) > 0).mean():.2%}")
    
    return val_scores, models, windows
 
# 使用示例
from lightgbm import LGBMRegressor
 
params = {
    'objective': 'regression',
    'num_leaves': 31,
    'learning_rate': 0.05,
    'verbosity': -1,
}
 
val_scores, models, windows = walk_forward_validation_with_purging(
    X, y, dates, LGBMRegressor, params,
    initial_train_size=252,  # 1年
    val_size=21,             # 1个月
    step=21,                 # 1个月
    horizon=5                # 5天
)
 
# 绘制IC序列
import matplotlib.pyplot as plt
 
plt.figure(figsize=(12, 6))
plt.plot([w['ic'] for w in windows], 'o-', linewidth=2, markersize=8)
plt.axhline(y=np.mean(val_scores), color='r', linestyle='--', 
            label=f'Mean IC: {np.mean(val_scores):.4f}')
plt.axhline(y=0, color='gray', linestyle='-', alpha=0.3)
plt.xlabel('Window')
plt.ylabel('IC')
plt.title('Walk-Forward Validation IC')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

示例输出:

Window 1 (2024-Q1): IC = 0.0536
Window 2 (2024-Q2): IC = 0.0456
Window 3 (2024-Q3): IC = 0.0408
Window 4 (2024-Q4): IC = 0.0305

平均 IC: 0.0426 ± 0.0083

→ 如果 IC 随时间下降,说明模型需要重训练

4. 量化场景下的数据划分策略

4.1 训练集/验证集/测试集划分

推荐比例

时间轴:|======== Train ======|==== Val ====|=== Test ===|
      2018-2021           2022        2023
      4年                 1年         1年

代码实现

def train_val_test_split(X, y, train_ratio=0.6, val_ratio=0.2, test_ratio=0.2):
    """
    时序数据的三层划分
 
    参数:
        train_ratio: 训练集比例
        val_ratio: 验证集比例
        test_ratio: 测试集比例
 
    返回:
        (X_train, X_val, X_test), (y_train, y_val, y_test)
    """
    assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-6
 
    n_samples = len(X)
    train_end = int(n_samples * train_ratio)
    val_end = int(n_samples * (train_ratio + val_ratio))
 
    X_train, X_val, X_test = X[:train_end], X[train_end:val_end], X[val_end:]
    y_train, y_val, y_test = y[:train_end], y[train_end:val_end], y[val_end:]
 
    return (X_train, X_val, X_test), (y_train, y_val, y_test)

实际应用

# 示例:4年训练,1年验证,1年测试
(X_train, X_val, X_test), (y_train, y_val, y_test) = train_val_test_split(
    X, y,
    train_ratio=4/6,
    val_ratio=1/6,
    test_ratio=1/6
)
 
print(f"Train: {len(X_train)} samples")
print(f"Val:   {len(X_val)} samples")
print(f"Test:  {len(X_test)} samples")

3.2 多市场周期的划分

为什么要考虑市场周期?

不同市场状态(牛市/熊市/震荡市)下,因子表现差异巨大。

识别市场状态

def identify_market_regime(prices, window=252):
    """
    识别市场状态
 
    参数:
        prices: 价格序列,shape=[n_samples]
        window: 滚动窗口大小
 
    返回:
        regimes: 市场状态标签(0=震荡, 1=牛市, -1=熊市)
    """
    # 计算收益率
    returns = prices.pct_change().dropna()
 
    # 计算滚动趋势和波动率
    trend = returns.rolling(window).mean()
    volatility = returns.rolling(window).std()
 
    # 定义阈值
    trend_threshold = trend.mean() + 0.5 * volatility.mean()
    volatility_threshold = volatility.mean()
 
    regimes = np.zeros(len(prices))
 
    for i in range(window, len(prices)):
        if trend[i] > trend_threshold:
            regimes[i] = 1  # 牛市
        elif trend[i] < -trend_threshold:
            regimes[i] = -1  # 熊市
        else:
            regimes[i] = 0  # 震荡市
 
    return regimes

按市场状态划分数据

def split_by_regime(X, y, regimes):
    """
    按市场状态划分数据
 
    返回:
        dict: {regime: (X_regime, y_regime)}
    """
    regime_splits = {}
 
    for regime in [-1, 0, 1]:
        mask = regimes == regime
        regime_splits[regime] = (X[mask], y[mask])
 
    return regime_splits
 
# 使用示例
regime_splits = split_by_regime(X, y, regimes)
 
for regime, (X_regime, y_regime) in regime_splits.items():
    regime_name = {1: '牛市', 0: '震荡市', -1: '熊市'}[regime]
    print(f"{regime_name}: {len(X_regime)} samples")

平衡不同市场状态的样本

def balance_regime_samples(X, y, regimes, target_ratio=0.3):
    """
    平衡不同市场状态的样本比例
 
    参数:
        target_ratio: 牛市和熊市的目标比例
 
    返回:
        (X_balanced, y_balanced)
    """
    # 计算当前比例
    bull_ratio = np.sum(regimes == 1) / len(regimes)
    bear_ratio = np.sum(regimes == -1) / len(regimes)
 
    # 计算需要采样/重复的次数
    bull_factor = target_ratio / bull_ratio if bull_ratio > 0 else 0
    bear_factor = target_ratio / bear_ratio if bear_ratio > 0 else 0
 
    X_balanced, y_balanced = [], []
 
    for regime in [-1, 0, 1]:
        mask = regimes == regime
        X_regime, y_regime = X[mask], y[mask]
 
        if regime == 1:
            factor = bull_factor
        elif regime == -1:
            factor = bear_factor
        else:
            factor = 1.0
 
        # 采样或重复
        n_samples = int(len(X_regime) * factor)
        if factor > 1:
            # 重复采样
            indices = np.random.choice(len(X_regime), n_samples, replace=True)
        else:
            # 随机采样
            indices = np.random.choice(len(X_regime), n_samples, replace=False)
 
        X_balanced.append(X_regime[indices])
        y_balanced.append(y_regime[indices])
 
    X_balanced = np.vstack(X_balanced)
    y_balanced = np.hstack(y_balanced)
 
    return X_balanced, y_balanced

3.3 按行业划分

行业异质性

不同行业的股票特征和收益模式差异显著,需要行业划分。

代码实现

def split_by_industry(X, y, industry_codes):
    """
    按行业划分数据
 
    参数:
        industry_codes: 行业代码,shape=[n_samples]
 
    返回:
        dict: {industry_code: (X_industry, y_industry)}
    """
    industry_splits = {}
 
    for code in np.unique(industry_codes):
        mask = industry_codes == code
        industry_splits[code] = (X[mask], y[mask])
 
    return industry_splits

行业中性化

def industry_neutralize(X, y, industry_codes):
    """
    行业中性化:消除行业效应
 
    方法:对每个行业进行标准化
    """
    X_neutral = np.zeros_like(X)
    y_neutral = np.zeros_like(y)
 
    for code in np.unique(industry_codes):
        mask = industry_codes == code
 
        # 标准化特征
        X_industry = X[mask]
        X_mean = X_industry.mean(axis=0)
        X_std = X_industry.std(axis=0)
        X_neutral[mask] = (X_industry - X_mean) / (X_std + 1e-8)
 
        # 标准化目标
        y_industry = y[mask]
        y_mean = y_industry.mean()
        y_std = y_industry.std()
        y_neutral[mask] = (y_industry - y_mean) / (y_std + 1e-8)
 
    return X_neutral, y_neutral

4. 数据划分的最佳实践

4.1 完整的数据划分流程

class QuantDataSplitter:
    """
    量化数据划分器
 
    功能:
    1. 时间序列划分
    2. 市场状态识别
    3. 行业中性化
    4. 数据验证
    """
 
    def __init__(self, prices, industry_codes=None):
        self.prices = prices
        self.industry_codes = industry_codes
 
    def identify_regimes(self, window=252):
        """识别市场状态"""
        returns = self.prices.pct_change().dropna()
        trend = returns.rolling(window).mean()
        volatility = returns.rolling(window).std()
 
        trend_threshold = trend.mean() + 0.5 * volatility.mean()
        regimes = np.zeros(len(self.prices))
 
        for i in range(window, len(self.prices)):
            if trend[i] > trend_threshold:
                regimes[i] = 1
            elif trend[i] < -trend_threshold:
                regimes[i] = -1
            else:
                regimes[i] = 0
 
        self.regimes = regimes
        return regimes
 
    def train_val_test_split(self, X, y, train_ratio=0.6, val_ratio=0.2, test_ratio=0.2):
        """三层划分"""
        assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-6
 
        n_samples = len(X)
        train_end = int(n_samples * train_ratio)
        val_end = int(n_samples * (train_ratio + val_ratio))
 
        splits = {
            'train': (X[:train_end], y[:train_end]),
            'val': (X[train_end:val_end], y[train_end:val_end]),
            'test': (X[val_end:], y[val_end:])
        }
 
        return splits
 
    def validate_no_leakage(self, X_train, X_val):
        """验证无信息泄露"""
        train_mean = X_train.mean(axis=0)
        train_std = X_train.std(axis=0)
        val_mean = X_val.mean(axis=0)
 
        # 检查均值差异
        mean_diff = np.abs(train_mean - val_mean) / (train_std + 1e-8)
 
        if np.any(mean_diff > 3):
            print("警告:训练集和验证集分布差异较大")
            print(f"最大标准化差异: {np.max(mean_diff):.4f}")
 
        return mean_diff
 
    def split(self, X, y, neutralize_industry=True):
        """完整的划分流程"""
        # 1. 识别市场状态
        self.identify_regimes()
 
        # 2. 行业中性化
        if self.industry_codes is not None and neutralize_industry:
            X, y = self.industry_neutralize(X, y)
 
        # 3. 三层划分
        splits = self.train_val_test_split(X, y)
 
        # 4. 验证无信息泄露
        X_train, X_val = splits['train'][0], splits['val'][0]
        self.validate_no_leakage(X_train, X_val)
 
        return splits

使用示例

# 创建划分器
splitter = QuantDataSplitter(prices, industry_codes)
 
# 划分数据
splits = splitter.split(X, y)
 
# 提取各部分数据
X_train, y_train = splits['train']
X_val, y_val = splits['val']
X_test, y_test = splits['test']
 
# 打印统计信息
print(f"Train: {len(X_train)} samples")
print(f"Val:   {len(X_val)} samples")
print(f"Test:  {len(X_test)} samples")

4.2 数据划分检查清单

划分前检查

  • 数据按时间排序
  • 移除未来信息泄露
  • 确认无NaN值或合理处理
  • 特征标准化在划分后进行
  • 行业/市场状态信息完整

划分后检查

  • 训练集、验证集、测试集按时间顺序排列
  • 验证集和测试集严格在训练集之后
  • 检查分布差异(均值、方差)
  • 确认目标变量分布合理
  • 记录划分比例和时间范围

评估指标检查

  • 训练集、验证集、测试集分别评估
  • 检查IC/Rank IC在不同子集上的稳定性
  • 分析过拟合程度
  • 检查样本外性能衰减

5. 总结

时序数据划分是量化投资中至关重要的环节,核心原则是严格遵守因果性约束。主要方法包括:

  1. 时间序列交叉验证:保证训练集严格在验证集之前
  2. 滚动窗口验证:适应数据分布随时间变化
  3. 步进验证:模拟实际交易场景
  4. 多维度划分:考虑市场状态、行业等因素

正确的数据划分是构建稳健量化模型的基础,必须在模型开发初期就建立严格的划分规范。