时序数据划分

目录


1. 为什么不能用随机划分

1.1 因果性原则

量化投资的核心是用过去预测未来。如果随机打乱数据,就会违反因果律。

import numpy as np
import pandas as pd
 
# 模拟股票数据
np.random.seed(42)
dates = pd.date_range('2020-01-01', '2023-12-31', freq='D')
n = len(dates)
 
# 生成带时间趋势的数据(越后面的股票表现越好)
trend = np.linspace(0, 0.3, n)
returns = 0.02 * trend + np.random.randn(n) * 0.3
 
df = pd.DataFrame({
    'date': dates,
    'stock_id': 'STOCK001',
    'return': returns
}).set_index('date')
 
print(f"2020年平均收益: {df.loc['2020', 'return'].mean():.4f}")
print(f"2023年平均收益: {df.loc['2023', 'return'].mean():.4f}")

输出示例:

2020年平均收益: -0.0234
2023年平均收益: 0.1256

可以看到,数据有明显的时序结构!

1.2 随机划分的问题

from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
 
# 特征:简单的滞后收益
df['feature'] = df['return'].shift(1).fillna(0)
 
X = df[['feature']].values
y = df['return'].values
 
# 错误做法:随机划分
X_train_rand, X_test_rand, y_train_rand, y_test_rand = train_test_split(
    X, y, test_size=0.3, random_state=42, shuffle=True
)
 
model_rand = LinearRegression()
model_rand.fit(X_train_rand, y_train_rand)
mse_rand = mean_squared_error(y_test_rand, model_rand.predict(X_test_rand))
 
# 正确做法:时序划分
split_idx = int(0.7 * len(df))
X_train_time, X_test_time = X[:split_idx], X[split_idx:]
y_train_time, y_test_time = y[:split_idx], y[split_idx:]
 
model_time = LinearRegression()
model_time.fit(X_train_time, y_train_time)
mse_time = mean_squared_error(y_test_time, model_time.predict(X_test_time))
 
print(f"随机划分 MSE: {mse_rand:.6f}")
print(f"时序划分 MSE: {mse_time:.6f}")
print(f"随机划分看起来更好!但这是虚假的...")

问题分析:

问题随机划分时序划分
数据泄漏测试集可能包含训练集”未来”的信息严格按时间切分
评估偏差过于乐观真实反映实战
实战差距

1.3 数据泄漏的类型

# 示例1:前视偏差(Look-ahead Bias)
# 错误:使用未来信息构造特征
df_wrong = df.copy()
df_wrong['future_volatility'] = df_wrong['return'].rolling(20).std().shift(-20)  # ❌ 使用未来数据
 
# 正确:只用历史信息
df_correct = df.copy()
df_correct['historical_volatility'] = df_correct['return'].rolling(20).std()  # ✅
 
# 示例2:目标泄漏(Target Leakage)
# 错误:使用与目标高度相关但不可用的特征
df_wrong['high_correlation_feature'] = df_wrong['return'] * 0.95 + np.random.randn(len(df)) * 0.01  # ❌
 
# 示例3:统计泄漏(Statistical Leakage)
# 错误:在划分前进行全局标准化
from sklearn.preprocessing import StandardScaler
 
# ❌ 错误:全局标准化泄漏了测试集的统计信息
scaler_wrong = StandardScaler()
X_scaled_wrong = scaler_wrong.fit_transform(X)  # 用全部数据fit
X_train_rand_wrong, X_test_rand_wrong, _, _ = train_test_split(
    X_scaled_wrong, y, test_size=0.3, shuffle=True
)
 
# ✅ 正确:只用训练集的统计信息
scaler_correct = StandardScaler()
X_train_time_scaled = scaler_correct.fit_transform(X_train_time)
X_test_time_scaled = scaler_correct.transform(X_test_time)  # 只transform

2. 简单时序划分

2.1 基本划分

最简单的方法:按时间顺序一次性切分。

def time_split(df, test_size=0.2, val_size=0.1):
    """
    简单时序划分
 
    参数:
        df: 包含时间索引的DataFrame
        test_size: 测试集比例
        val_size: 验证集比例
 
    返回:
        train_df, val_df, test_df
    """
    n = len(df)
 
    # 计算划分点
    test_start_idx = int(n * (1 - test_size))
    val_start_idx = int(test_start_idx * (1 - val_size / (1 - test_size)))
 
    train_df = df.iloc[:val_start_idx]
    val_df = df.iloc[val_start_idx:test_start_idx]
    test_df = df.iloc[test_start_idx:]
 
    return train_df, val_df, test_df
 
# 使用示例
train, val, test = time_split(df, test_size=0.2, val_size=0.1)
 
print(f"训练集: {train.index[0]}{train.index[-1]}, 共 {len(train)} 条")
print(f"验证集: {val.index[0]}{val.index[-1]}, 共 {len(val)} 条")
print(f"测试集: {test.index[0]}{test.index[-1]}, 共 {len(test)} 条")

2.2 按日期划分

更直观的方式:指定具体日期。

def time_split_by_date(df, val_date, test_date):
    """
    按日期划分
 
    参数:
        df: 包含时间索引的DataFrame
        val_date: 验证集开始日期 (str or datetime)
        test_date: 测试集开始日期 (str or datetime)
 
    返回:
        train_df, val_df, test_df
    """
    train_df = df[df.index < val_date]
    val_df = df[(df.index >= val_date) & (df.index < test_date)]
    test_df = df[df.index >= test_date]
 
    return train_df, val_df, test_df
 
# 使用示例
train, val, test = time_split_by_date(
    df,
    val_date='2023-01-01',
    test_date='2023-07-01'
)
 
print(f"训练集结束于: {train.index[-1].date()}")
print(f"验证集: {val.index[0].date()}{val.index[-1].date()}")
print(f"测试集开始于: {test.index[0].date()}")

2.3 划分比例建议

数据量训练集验证集测试集说明
< 1年60%20%20%数据少,测试集比例大
1-3年70%15%15%标准划分
> 3年80%10%10%数据多,测试集可小

原则: 测试集至少包含一个完整的市场周期。


3. 滚动窗口验证

3.1 原理

滚动窗口(Rolling Window)保持固定的训练窗口长度,随时间向前滚动。

训练窗口: [====固定长度====]
验证窗口:         [==]

第1折: [====][==]
第2折:     [====][==]
第3折:         [====][==]

优点:

  • 训练分布稳定
  • 模型不会受到早期陈旧数据影响

缺点:

  • 早期折数训练数据少
  • 浪费了历史数据

3.2 实现

import numpy as np
import pandas as pd
 
class RollingWindowCV:
    """滚动窗口交叉验证"""
 
    def __init__(self, train_size=252, test_size=63, step=21):
        """
        参数:
            train_size: 训练窗口大小(默认约1年交易日)
            test_size: 测试窗口大小(默认约1季度交易日)
            step: 滚动步长(默认约1月交易日)
        """
        self.train_size = train_size
        self.test_size = test_size
        self.step = step
 
    def split(self, X, y=None):
        """
        生成训练/测试索引
 
        参数:
            X: array-like, shape (n_samples, n_features)
            y: array-like, shape (n_samples,), optional
 
        返回:
            (train_idx, test_idx) 的迭代器
        """
        n_samples = X.shape[0]
 
        # 计算可能的划分数量
        n_splits = (n_samples - self.train_size) // self.step
 
        if n_splits <= 0:
            raise ValueError("数据量不足以进行滚动窗口划分")
 
        for i in range(n_splits):
            test_start = self.train_size + i * self.step
 
            # 确保测试集不超过数据范围
            if test_start + self.test_size > n_samples:
                break
 
            train_start = test_start - self.train_size
            test_end = min(test_start + self.test_size, n_samples)
 
            train_idx = np.arange(train_start, test_start)
            test_idx = np.arange(test_start, test_end)
 
            yield train_idx, test_idx
 
    def get_n_splits(self, X=None, y=None):
        """获取划分数量"""
        if X is None:
            return 0
        n_samples = X.shape[0]
        return (n_samples - self.train_size) // self.step
 
# 使用示例
cv = RollingWindowCV(train_size=500, test_size=100, step=50)
 
splits = list(cv.split(df))
print(f"总共 {len(splits)} 折划分")
 
for i, (train_idx, test_idx) in enumerate(splits[:3]):
    print(f"\n{i+1} 折:")
    print(f"  训练: 索引 {train_idx[0]}-{train_idx[-1]}, 日期 {df.index[train_idx[0]].date()}-{df.index[train_idx[-1]].date()}")
    print(f"  测试: 索引 {test_idx[0]}-{test_idx[-1]}, 日期 {df.index[test_idx[0]].date()}-{df.index[test_idx[-1]].date()}")

3.3 可视化

import matplotlib.pyplot as plt
 
def plot_rolling_window_splits(df, cv, max_splits=5):
    """可视化滚动窗口划分"""
    fig, ax = plt.subplots(figsize=(12, 3))
 
    for i, (train_idx, test_idx) in enumerate(cv.split(df)):
        if i >= max_splits:
            break
 
        # 训练集(蓝色)
        ax.barh(i, len(train_idx), left=train_idx[0],
                height=0.6, color='steelblue', label='Train' if i == 0 else '')
        # 测试集(红色)
        ax.barh(i, len(test_idx), left=test_idx[0],
                height=0.6, color='salmon', label='Test' if i == 0 else '')
 
    ax.set_yticks(np.arange(min(max_splits, len(list(cv.split(df))))))
    ax.set_yticklabels([f'Fold {i+1}' for i in range(min(max_splits, len(list(cv.split(df)))))])
    ax.set_xlabel('Sample Index')
    ax.set_title('Rolling Window Cross-Validation')
    ax.legend(loc='upper right')
    plt.tight_layout()
    plt.show()
 
# 绘图
plot_rolling_window_splits(df, cv, max_splits=5)

4. 扩展窗口验证

4.1 原理

扩展窗口(Expanding Window)每次将新的数据加入训练集,训练集越来越大。

第1折: [====][==]
第2折: [======][==]
第3折: [========][==]

优点:

  • 充分利用所有历史数据
  • 适合数据积累的场景

缺点:

  • 训练时间随折数增加
  • 早期数据可能影响模型

4.2 实现

class ExpandingWindowCV:
    """扩展窗口交叉验证"""
 
    def __init__(self, min_train_size=252, test_size=63, step=21):
        """
        参数:
            min_train_size: 最小训练窗口大小
            test_size: 测试窗口大小
            step: 滚动步长
        """
        self.min_train_size = min_train_size
        self.test_size = test_size
        self.step = step
 
    def split(self, X, y=None):
        """生成训练/测试索引"""
        n_samples = X.shape[0]
 
        # 计算可能的划分数量
        n_splits = (n_samples - self.min_train_size - self.test_size) // self.step + 1
 
        if n_splits <= 0:
            raise ValueError("数据量不足以进行扩展窗口划分")
 
        for i in range(n_splits):
            test_start = self.min_train_size + i * self.step
 
            # 确保测试集不超过数据范围
            if test_start + self.test_size > n_samples:
                break
 
            test_end = min(test_start + self.test_size, n_samples)
 
            train_idx = np.arange(0, test_start)
            test_idx = np.arange(test_start, test_end)
 
            yield train_idx, test_idx
 
    def get_n_splits(self, X=None, y=None):
        """获取划分数量"""
        if X is None:
            return 0
        n_samples = X.shape[0]
        return (n_samples - self.min_train_size - self.test_size) // self.step + 1
 
# 使用示例
cv_expanding = ExpandingWindowCV(min_train_size=500, test_size=100, step=50)
 
splits = list(cv_expanding.split(df))
print(f"总共 {len(splits)} 折划分")
 
for i, (train_idx, test_idx) in enumerate(splits[:3]):
    print(f"\n{i+1} 折:")
    print(f"  训练: {len(train_idx)} 样本, 索引 {train_idx[0]}-{train_idx[-1]}")
    print(f"  测试: {len(test_idx)} 样本, 索引 {test_idx[0]}-{test_idx[-1]}")

4.3 滚动 vs 扩展对比

# 对比两种方法的训练集大小
cv_rolling = RollingWindowCV(train_size=500, test_size=100, step=50)
cv_expanding = ExpandingWindowCV(min_train_size=500, test_size=100, step=50)
 
print("训练集大小对比:\n")
print("折数 | 滚动窗口 | 扩展窗口")
print("----|----------|----------")
for i in range(min(5, len(list(cv_rolling.split(df))))):
    rolling_train = len(list(cv_rolling.split(df))[i][0])
    expanding_train = len(list(cv_expanding.split(df))[i][0])
    print(f" {i+1:2d}  |   {rolling_train:4d}   |   {expanding_train:4d}")

5. Walk-Forward 验证

5.1 原理

Walk-Forward 验证是量化回测的黄金标准,模拟真实交易环境:

  1. 在历史数据上训练模型
  2. 在未来一段时期测试
  3. 向前滚动,重复以上过程
训练 | 测试 | 重新训练 | 测试 | 重新训练 | 测试
[====][==]      [====][==]           [====][==]

5.2 完整实现

class WalkForwardCV:
    """
    Walk-Forward 交叉验证
 
    这是最接近实战的验证方法,模拟定期重新训练模型的策略。
    """
 
    def __init__(self, train_size=504, test_size=126, retrain_freq=63):
        """
        参数:
            train_size: 训练窗口大小(默认约2年交易日)
            test_size: 测试窗口大小(默认约半年交易日)
            retrain_freq: 重新训练频率(默认约1季度交易日)
        """
        self.train_size = train_size
        self.test_size = test_size
        self.retrain_freq = retrain_freq
 
    def split(self, X, y=None):
        """生成训练/测试索引"""
        n_samples = X.shape[0]
 
        # 第一个测试期的开始位置
        test_starts = range(self.train_size, n_samples - self.test_size + 1, self.retrain_freq)
 
        for test_start in test_starts:
            train_end = test_start
            test_end = min(test_start + self.test_size, n_samples)
 
            train_idx = np.arange(max(0, train_end - self.train_size), train_end)
            test_idx = np.arange(test_start, test_end)
 
            if len(train_idx) < self.train_size // 2:  # 训练数据太少则跳过
                continue
 
            yield train_idx, test_idx
 
    def get_n_splits(self, X=None, y=None):
        """获取划分数量"""
        if X is None:
            return 0
        n_samples = X.shape[0]
        return len(range(self.train_size, n_samples - self.test_size + 1, self.retrain_freq))
 
# 使用示例
cv_wf = WalkForwardCV(train_size=500, test_size=100, retrain_freq=50)
 
splits = list(cv_wf.split(df))
print(f"总共 {len(splits)} 个 Walk-Forward 周期")
 
for i, (train_idx, test_idx) in enumerate(splits[:3]):
    print(f"\n周期 {i+1}:")
    print(f"  训练: {df.index[train_idx[0]].date()}{df.index[train_idx[-1]].date()} ({len(train_idx)} 天)")
    print(f"  测试: {df.index[test_idx[0]].date()}{df.index[test_idx[-1]].date()} ({len(test_idx)} 天)")

5.3 Walk-Forward 回测示例

from sklearn.metrics import mean_squared_error
import lightgbm as lgb
 
def walk_forward_backtest(df, cv, model_params=None):
    """
    Walk-Forward 回测
 
    参数:
        df: 包含特征和目标的DataFrame
        cv: WalkForwardCV 实例
        model_params: 模型参数字典
 
    返回:
        results: 包含每期结果的DataFrame
    """
    if model_params is None:
        model_params = {
            'n_estimators': 100,
            'max_depth': 4,
            'learning_rate': 0.05,
            'verbose': -1
        }
 
    results = []
 
    for i, (train_idx, test_idx) in enumerate(cv.split(df)):
        # 准备数据
        X_train = df[['feature']].iloc[train_idx].values
        y_train = df['return'].iloc[train_idx].values
        X_test = df[['feature']].iloc[test_idx].values
        y_test = df['return'].iloc[test_idx].values
 
        # 训练模型
        model = lgb.LGBMRegressor(**model_params)
        model.fit(X_train, y_train)
 
        # 预测
        y_pred = model.predict(X_test)
 
        # 计算指标
        mse = mean_squared_error(y_test, y_pred)
        ic = np.corrcoef(y_test, y_pred)[0, 1]
 
        results.append({
            'period': i + 1,
            'train_start': df.index[train_idx[0]].date(),
            'train_end': df.index[train_idx[-1]].date(),
            'test_start': df.index[test_idx[0]].date(),
            'test_end': df.index[test_idx[-1]].date(),
            'train_size': len(train_idx),
            'test_size': len(test_idx),
            'mse': mse,
            'ic': ic
        })
 
    return pd.DataFrame(results)
 
# 运行回测
cv_wf = WalkForwardCV(train_size=500, test_size=100, retrain_freq=50)
results_df = walk_forward_backtest(df, cv_wf)
 
print("Walk-Forward 回测结果:")
print(results_df[['period', 'test_start', 'test_end', 'ic']].head(10))
 
# 分析整体表现
print(f"\n整体 IC 均值: {results_df['ic'].mean():.4f}")
print(f"整体 IC 标准差: {results_df['ic'].std():.4f}")
print(f"IC > 0 的比例: {(results_df['ic'] > 0).mean():.2%}")

6. Purging 和 Embargo

6.1 问题背景

即使按时序划分,仍然可能存在数据泄漏:

  1. Purging 问题:验证集中的样本可能与训练集中的样本高度相关(时间太近)
  2. Embargo 问题:训练集末尾的样本可能泄漏验证集开始的信息

6.2 Purging:清除重叠期

从训练集中移除与验证集时间重叠的样本。

class PurgedCV:
    """带清除的时序交叉验证"""
 
    def __init__(self, train_size=500, test_size=100, purge_size=21):
        """
        参数:
            train_size: 训练窗口大小
            test_size: 测试窗口大小
            purge_size: 清除期大小(从训练集末尾移除)
        """
        self.train_size = train_size
        self.test_size = test_size
        self.purge_size = purge_size
 
    def split(self, X, y=None):
        """生成训练/测试索引(带清除)"""
        n_samples = X.shape[0]
 
        for test_start in range(self.train_size, n_samples - self.test_size + 1, self.test_size):
            test_end = min(test_start + self.test_size, n_samples)
 
            # 训练集:移除末尾的清除期
            train_end = test_start - self.purge_size
            train_start = max(0, train_end - self.train_size)
 
            train_idx = np.arange(train_start, train_end)
            test_idx = np.arange(test_start, test_end)
 
            if len(train_idx) < self.train_size // 2:
                continue
 
            yield train_idx, test_idx
 
# 可视化清除效果
cv_purge = PurgedCV(train_size=500, test_size=100, purge_size=21)
 
fig, ax = plt.subplots(figsize=(12, 2))
 
for i, (train_idx, test_idx) in enumerate(cv_purge.split(df)):
    if i >= 3:
        break
 
    # 训练集(蓝色)
    ax.barh(i, len(train_idx), left=train_idx[0], height=0.6, color='steelblue')
    # 清除期(黄色)
    ax.barh(i, cv_purge.purge_size, left=train_idx[-1] + 1, height=0.6, color='gold', label='Purge' if i == 0 else '')
    # 测试集(红色)
    ax.barh(i, len(test_idx), left=test_idx[0], height=0.6, color='salmon', label='Test' if i == 0 else '')
 
ax.set_yticks([0, 1, 2])
ax.set_yticklabels(['Fold 1', 'Fold 2', 'Fold 3'])
ax.set_xlabel('Sample Index')
ax.set_title('Purged Cross-Validation (Yellow=Removed from Training)')
ax.legend(loc='upper right')
plt.tight_layout()
plt.show()

6.3 Embargo:测试后禁入期

防止测试集的信息被用于下一轮训练。

class EmbargoCV:
    """带禁入期的时序交叉验证"""
 
    def __init__(self, train_size=500, test_size=100, embargo_size=21):
        """
        参数:
            train_size: 训练窗口大小
            test_size: 测试窗口大小
            embargo_size: 禁入期大小(测试集之后不可用于训练)
        """
        self.train_size = train_size
        self.test_size = test_size
        self.embargo_size = embargo_size
 
    def split(self, X, y=None):
        """生成训练/测试索引(带禁入期)"""
        n_samples = X.shape[0]
        test_starts = []
 
        # 计算每个测试期的开始位置(考虑禁入期)
        current_test_start = self.train_size
        while current_test_start + self.test_size <= n_samples:
            test_starts.append(current_test_start)
            current_test_start += self.test_size + self.embargo_size
 
        for test_start in test_starts:
            test_end = min(test_start + self.test_size, n_samples)
 
            # 训练集:不能包含上一期的测试集和禁入期
            train_end = test_start
            train_start = max(0, train_end - self.train_size)
 
            train_idx = np.arange(train_start, train_end)
            test_idx = np.arange(test_start, test_end)
 
            if len(train_idx) < self.train_size // 2:
                continue
 
            yield train_idx, test_idx

6.4 组合 Purging + Embargo

class PurgedEmbargoCV:
    """
    同时使用 Purging 和 Embargo 的时序交叉验证
    这是量化回测的最严格标准
    """
 
    def __init__(self, train_size=500, test_size=100, purge_size=21, embargo_size=21):
        """
        参数:
            train_size: 训练窗口大小
            test_size: 测试窗口大小
            purge_size: 从训练集末尾清除的样本数
            embargo_size: 测试集后的禁入期样本数
        """
        self.train_size = train_size
        self.test_size = test_size
        self.purge_size = purge_size
        self.embargo_size = embargo_size
 
    def split(self, X, y=None):
        """生成训练/测试索引"""
        n_samples = X.shape[0]
        test_starts = []
 
        current_test_start = self.train_size + self.purge_size
        while current_test_start + self.test_size <= n_samples:
            test_starts.append(current_test_start)
            current_test_start += self.test_size + self.embargo_size
 
        for test_start in test_starts:
            test_end = min(test_start + self.test_size, n_samples)
 
            # 训练集:移除末尾的清除期
            train_end = test_start - self.purge_size
            train_start = max(0, train_end - self.train_size)
 
            train_idx = np.arange(train_start, train_end)
            test_idx = np.arange(test_start, test_end)
 
            if len(train_idx) < self.train_size // 2:
                continue
 
            yield train_idx, test_idx
 
    def get_n_splits(self, X=None, y=None):
        """获取划分数量"""
        if X is None:
            return 0
        n_samples = X.shape[0]
        count = 0
        current_test_start = self.train_size + self.purge_size
        while current_test_start + self.test_size <= n_samples:
            count += 1
            current_test_start += self.test_size + self.embargo_size
        return count
 
# 使用示例
cv_pe = PurgedEmbargoCV(train_size=500, test_size=100, purge_size=21, embargo_size=21)
print(f"总共 {cv_pe.get_n_splits(df)} 个划分周期")

7. 时序交叉验证封装

7.1 统一的时序 CV 类

from sklearn.base import BaseEstimator, clone
from sklearn.model_selection import BaseCrossValidator
 
class TimeSeriesCV(BaseCrossValidator):
    """
    统一的时序交叉验证类
 
    支持多种划分策略:
    - 'rolling': 滚动窗口
    - 'expanding': 扩展窗口
    - 'walk_forward': Walk-Forward
    - 'purged_embargo': 带清除和禁入期的 Walk-Forward
    """
 
    def __init__(self, method='walk_forward', train_size=504, test_size=126,
                 step=63, purge_size=21, embargo_size=21):
        """
        参数:
            method: 划分方法
            train_size: 训练窗口大小
            test_size: 测试窗口大小
            step: 滚动步长
            purge_size: 清除期大小
            embargo_size: 禁入期大小
        """
        self.method = method
        self.train_size = train_size
        self.test_size = test_size
        self.step = step
        self.purge_size = purge_size
        self.embargo_size = embargo_size
 
    def _get_rolling_splits(self, X):
        """滚动窗口划分"""
        n_samples = X.shape[0]
        for test_start in range(self.train_size, n_samples - self.test_size + 1, self.step):
            test_end = min(test_start + self.test_size, n_samples)
            train_idx = np.arange(test_start - self.train_size, test_start)
            test_idx = np.arange(test_start, test_end)
            yield train_idx, test_idx
 
    def _get_expanding_splits(self, X):
        """扩展窗口划分"""
        n_samples = X.shape[0]
        for test_start in range(self.train_size, n_samples - self.test_size + 1, self.step):
            test_end = min(test_start + self.test_size, n_samples)
            train_idx = np.arange(0, test_start)
            test_idx = np.arange(test_start, test_end)
            yield train_idx, test_idx
 
    def _get_walk_forward_splits(self, X):
        """Walk-Forward 划分"""
        n_samples = X.shape[0]
        for test_start in range(self.train_size, n_samples - self.test_size + 1, self.step):
            test_end = min(test_start + self.test_size, n_samples)
            train_idx = np.arange(max(0, test_start - self.train_size), test_start)
            test_idx = np.arange(test_start, test_end)
            yield train_idx, test_idx
 
    def _get_purged_embargo_splits(self, X):
        """带清除和禁入期的划分"""
        n_samples = X.shape[0]
        current_test_start = self.train_size + self.purge_size
        while current_test_start + self.test_size <= n_samples:
            test_end = min(current_test_start + self.test_size, n_samples)
            train_end = current_test_start - self.purge_size
            train_idx = np.arange(max(0, train_end - self.train_size), train_end)
            test_idx = np.arange(current_test_start, test_end)
            if len(train_idx) >= self.train_size // 2:
                yield train_idx, test_idx
            current_test_start += self.test_size + self.embargo_size
 
    def split(self, X, y=None, groups=None):
        """生成划分索引"""
        if self.method == 'rolling':
            return self._get_rolling_splits(X)
        elif self.method == 'expanding':
            return self._get_expanding_splits(X)
        elif self.method == 'walk_forward':
            return self._get_walk_forward_splits(X)
        elif self.method == 'purged_embargo':
            return self._get_purged_embargo_splits(X)
        else:
            raise ValueError(f"Unknown method: {self.method}")
 
    def get_n_splits(self, X=None, y=None, groups=None):
        """获取划分数量"""
        if X is None:
            return 0
        return len(list(self.split(X)))
 
# 使用示例
cv = TimeSeriesCV(method='purged_embargo', train_size=500, test_size=100,
                  step=50, purge_size=21, embargo_size=21)
 
print(f"方法: {cv.method}")
print(f"划分数量: {cv.get_n_splits(df)}")

7.2 与 sklearn 的集成

from sklearn.model_selection import cross_val_score
 
# 创建时序 CV
cv_ts = TimeSeriesCV(method='walk_forward', train_size=500, test_size=100, step=100)
 
# 准备数据
X = df[['feature']].values
y = df['return'].values
 
# 使用 cross_val_score
from sklearn.linear_model import LinearRegression
 
model = LinearRegression()
# 注意:cross_val_score 会打乱数据,不适合时序
# 我们需要手动实现
 
def cross_val_score_time_series(model, X, y, cv):
    """时序交叉验证评分"""
    scores = []
    model = clone(model)
 
    for train_idx, test_idx in cv.split(X):
        X_train, X_test = X[train_idx], X[test_idx]
        y_train, y_test = y[train_idx], y[test_idx]
 
        model.fit(X_train, y_train)
        score = model.score(X_test, y_test)
        scores.append(score)
 
    return np.array(scores)
 
# 使用
scores = cross_val_score_time_series(model, X, y, cv_ts)
print(f"各折 R²: {scores}")
print(f"平均 R²: {scores.mean():.4f} (+/- {scores.std() * 2:.4f})")

8. 常见错误示例

8.1 错误1:随机 shuffle

# ❌ 错误:在时序数据上使用 shuffle
from sklearn.model_selection import KFold
 
kf = KFold(n_splits=5, shuffle=True, random_state=42)
for train_idx, test_idx in kf.split(X):
    # 这会导致数据泄漏!
    pass
 
# ✅ 正确:使用时序 CV
cv = TimeSeriesCV(method='walk_forward', train_size=500, test_size=100)
for train_idx, test_idx in cv.split(X):
    # 正确的时序划分
    pass

8.2 错误2:标准化后再划分

# ❌ 错误:全局标准化
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)  # 用了全部数据的统计信息
# 然后才划分...
 
# ✅ 正确:在 CV 内部标准化
from sklearn.pipeline import Pipeline
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('model', LinearRegression())
])
 
for train_idx, test_idx in cv.split(X):
    X_train, X_test = X[train_idx], X[test_idx]
    pipeline.fit(X_train, y_train)  # 只用训练集fit
    # pipeline会自动正确处理标准化

8.3 错误3:错误的 Purging 实现

# ❌ 错误:只从训练集末尾移除,但没有考虑上一期测试集
def wrong_purge(train_end, test_start):
    return train_end - 21  # 只移除21天
 
# ✅ 正确:确保训练集和测试集之间有足够的间隙
def correct_purge(train_end, test_start, purge_size=21):
    # 训练集结束 + 清除期 <= 测试集开始
    return min(train_end, test_start - purge_size)

8.4 错误4:回测时使用未来信息

# ❌ 错误:在回测中使用全部数据的统计信息
df['rolling_mean'] = df['price'].rolling(20).mean()  # 正确
df['rolling_mean'] = df['price'].rolling(20).mean().shift(-1)  # 错误!用了未来数据
 
# ❌ 错误:特征工程使用了目标变量的信息
df['feature'] = df['return'] * 0.5  # 直接用目标构造特征
 
# ✅ 正确:只用历史信息
df['rolling_mean'] = df['price'].rolling(20).mean()
df['momentum'] = df['price'].pct_change(20)

核心知识点总结

划分方法对比

方法训练集变化适用场景优点缺点
简单划分固定快速验证简单单次测试不稳定
滚动窗口固定大小生产环境分布稳定浪费历史数据
扩展窗口逐渐增大研究环境利用全部数据训练时间长
Walk-Forward模拟实战正式回测最接近实战实现复杂

Purging 和 Embargo

技术作用典型参数
Purging移除训练集末尾与测试集重叠的部分21天(约1月)
Embargo测试集后不可用于下一期训练21天(约1月)

时序 CV 选择指南

数据量少 (<1000样本)    → 简单划分或扩展窗口
数据量中 (1000-10000)   → Walk-Forward
数据量大 (>10000)       → 滚动窗口

高频数据 (分钟级)        → purge_size=1小时, embargo_size=30分钟
日频数据                 → purge_size=5天, embargo_size=5天
周/月频数据              → purge_size=1期, embargo_size=1期

关键原则

  1. 因果性优先:永远用过去预测未来
  2. 避免泄漏:标准化、特征工程都要在划分后进行
  3. 模拟实战:Walk-Forward + Purging + Embargo 是最严格的标准
  4. 充分测试:测试集至少包含一个完整市场周期

下一节: 03-模型训练与调优.md - 学习如何高效训练和调优树模型。