时序数据划分
目录
- 1. 为什么不能用随机划分
- 2. 简单时序划分
- 3. 滚动窗口验证
- 4. 扩展窗口验证
- 5. Walk-Forward 验证
- 6. Purging 和 Embargo
- 7. 时序交叉验证封装
- 8. 常见错误示例
1. 为什么不能用随机划分
1.1 因果性原则
量化投资的核心是用过去预测未来。如果随机打乱数据,就会违反因果律。
import numpy as np
import pandas as pd
# 模拟股票数据
np.random.seed(42)
dates = pd.date_range('2020-01-01', '2023-12-31', freq='D')
n = len(dates)
# 生成带时间趋势的数据(越后面的股票表现越好)
trend = np.linspace(0, 0.3, n)
returns = 0.02 * trend + np.random.randn(n) * 0.3
df = pd.DataFrame({
'date': dates,
'stock_id': 'STOCK001',
'return': returns
}).set_index('date')
print(f"2020年平均收益: {df.loc['2020', 'return'].mean():.4f}")
print(f"2023年平均收益: {df.loc['2023', 'return'].mean():.4f}")输出示例:
2020年平均收益: -0.0234
2023年平均收益: 0.1256
可以看到,数据有明显的时序结构!
1.2 随机划分的问题
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
# 特征:简单的滞后收益
df['feature'] = df['return'].shift(1).fillna(0)
X = df[['feature']].values
y = df['return'].values
# 错误做法:随机划分
X_train_rand, X_test_rand, y_train_rand, y_test_rand = train_test_split(
X, y, test_size=0.3, random_state=42, shuffle=True
)
model_rand = LinearRegression()
model_rand.fit(X_train_rand, y_train_rand)
mse_rand = mean_squared_error(y_test_rand, model_rand.predict(X_test_rand))
# 正确做法:时序划分
split_idx = int(0.7 * len(df))
X_train_time, X_test_time = X[:split_idx], X[split_idx:]
y_train_time, y_test_time = y[:split_idx], y[split_idx:]
model_time = LinearRegression()
model_time.fit(X_train_time, y_train_time)
mse_time = mean_squared_error(y_test_time, model_time.predict(X_test_time))
print(f"随机划分 MSE: {mse_rand:.6f}")
print(f"时序划分 MSE: {mse_time:.6f}")
print(f"随机划分看起来更好!但这是虚假的...")问题分析:
| 问题 | 随机划分 | 时序划分 |
|---|---|---|
| 数据泄漏 | 测试集可能包含训练集”未来”的信息 | 严格按时间切分 |
| 评估偏差 | 过于乐观 | 真实反映实战 |
| 实战差距 | 大 | 小 |
1.3 数据泄漏的类型
# 示例1:前视偏差(Look-ahead Bias)
# 错误:使用未来信息构造特征
df_wrong = df.copy()
df_wrong['future_volatility'] = df_wrong['return'].rolling(20).std().shift(-20) # ❌ 使用未来数据
# 正确:只用历史信息
df_correct = df.copy()
df_correct['historical_volatility'] = df_correct['return'].rolling(20).std() # ✅
# 示例2:目标泄漏(Target Leakage)
# 错误:使用与目标高度相关但不可用的特征
df_wrong['high_correlation_feature'] = df_wrong['return'] * 0.95 + np.random.randn(len(df)) * 0.01 # ❌
# 示例3:统计泄漏(Statistical Leakage)
# 错误:在划分前进行全局标准化
from sklearn.preprocessing import StandardScaler
# ❌ 错误:全局标准化泄漏了测试集的统计信息
scaler_wrong = StandardScaler()
X_scaled_wrong = scaler_wrong.fit_transform(X) # 用全部数据fit
X_train_rand_wrong, X_test_rand_wrong, _, _ = train_test_split(
X_scaled_wrong, y, test_size=0.3, shuffle=True
)
# ✅ 正确:只用训练集的统计信息
scaler_correct = StandardScaler()
X_train_time_scaled = scaler_correct.fit_transform(X_train_time)
X_test_time_scaled = scaler_correct.transform(X_test_time) # 只transform2. 简单时序划分
2.1 基本划分
最简单的方法:按时间顺序一次性切分。
def time_split(df, test_size=0.2, val_size=0.1):
"""
简单时序划分
参数:
df: 包含时间索引的DataFrame
test_size: 测试集比例
val_size: 验证集比例
返回:
train_df, val_df, test_df
"""
n = len(df)
# 计算划分点
test_start_idx = int(n * (1 - test_size))
val_start_idx = int(test_start_idx * (1 - val_size / (1 - test_size)))
train_df = df.iloc[:val_start_idx]
val_df = df.iloc[val_start_idx:test_start_idx]
test_df = df.iloc[test_start_idx:]
return train_df, val_df, test_df
# 使用示例
train, val, test = time_split(df, test_size=0.2, val_size=0.1)
print(f"训练集: {train.index[0]} 到 {train.index[-1]}, 共 {len(train)} 条")
print(f"验证集: {val.index[0]} 到 {val.index[-1]}, 共 {len(val)} 条")
print(f"测试集: {test.index[0]} 到 {test.index[-1]}, 共 {len(test)} 条")2.2 按日期划分
更直观的方式:指定具体日期。
def time_split_by_date(df, val_date, test_date):
"""
按日期划分
参数:
df: 包含时间索引的DataFrame
val_date: 验证集开始日期 (str or datetime)
test_date: 测试集开始日期 (str or datetime)
返回:
train_df, val_df, test_df
"""
train_df = df[df.index < val_date]
val_df = df[(df.index >= val_date) & (df.index < test_date)]
test_df = df[df.index >= test_date]
return train_df, val_df, test_df
# 使用示例
train, val, test = time_split_by_date(
df,
val_date='2023-01-01',
test_date='2023-07-01'
)
print(f"训练集结束于: {train.index[-1].date()}")
print(f"验证集: {val.index[0].date()} 到 {val.index[-1].date()}")
print(f"测试集开始于: {test.index[0].date()}")2.3 划分比例建议
| 数据量 | 训练集 | 验证集 | 测试集 | 说明 |
|---|---|---|---|---|
| < 1年 | 60% | 20% | 20% | 数据少,测试集比例大 |
| 1-3年 | 70% | 15% | 15% | 标准划分 |
| > 3年 | 80% | 10% | 10% | 数据多,测试集可小 |
原则: 测试集至少包含一个完整的市场周期。
3. 滚动窗口验证
3.1 原理
滚动窗口(Rolling Window)保持固定的训练窗口长度,随时间向前滚动。
训练窗口: [====固定长度====]
验证窗口: [==]
第1折: [====][==]
第2折: [====][==]
第3折: [====][==]
优点:
- 训练分布稳定
- 模型不会受到早期陈旧数据影响
缺点:
- 早期折数训练数据少
- 浪费了历史数据
3.2 实现
import numpy as np
import pandas as pd
class RollingWindowCV:
"""滚动窗口交叉验证"""
def __init__(self, train_size=252, test_size=63, step=21):
"""
参数:
train_size: 训练窗口大小(默认约1年交易日)
test_size: 测试窗口大小(默认约1季度交易日)
step: 滚动步长(默认约1月交易日)
"""
self.train_size = train_size
self.test_size = test_size
self.step = step
def split(self, X, y=None):
"""
生成训练/测试索引
参数:
X: array-like, shape (n_samples, n_features)
y: array-like, shape (n_samples,), optional
返回:
(train_idx, test_idx) 的迭代器
"""
n_samples = X.shape[0]
# 计算可能的划分数量
n_splits = (n_samples - self.train_size) // self.step
if n_splits <= 0:
raise ValueError("数据量不足以进行滚动窗口划分")
for i in range(n_splits):
test_start = self.train_size + i * self.step
# 确保测试集不超过数据范围
if test_start + self.test_size > n_samples:
break
train_start = test_start - self.train_size
test_end = min(test_start + self.test_size, n_samples)
train_idx = np.arange(train_start, test_start)
test_idx = np.arange(test_start, test_end)
yield train_idx, test_idx
def get_n_splits(self, X=None, y=None):
"""获取划分数量"""
if X is None:
return 0
n_samples = X.shape[0]
return (n_samples - self.train_size) // self.step
# 使用示例
cv = RollingWindowCV(train_size=500, test_size=100, step=50)
splits = list(cv.split(df))
print(f"总共 {len(splits)} 折划分")
for i, (train_idx, test_idx) in enumerate(splits[:3]):
print(f"\n第 {i+1} 折:")
print(f" 训练: 索引 {train_idx[0]}-{train_idx[-1]}, 日期 {df.index[train_idx[0]].date()}-{df.index[train_idx[-1]].date()}")
print(f" 测试: 索引 {test_idx[0]}-{test_idx[-1]}, 日期 {df.index[test_idx[0]].date()}-{df.index[test_idx[-1]].date()}")3.3 可视化
import matplotlib.pyplot as plt
def plot_rolling_window_splits(df, cv, max_splits=5):
"""可视化滚动窗口划分"""
fig, ax = plt.subplots(figsize=(12, 3))
for i, (train_idx, test_idx) in enumerate(cv.split(df)):
if i >= max_splits:
break
# 训练集(蓝色)
ax.barh(i, len(train_idx), left=train_idx[0],
height=0.6, color='steelblue', label='Train' if i == 0 else '')
# 测试集(红色)
ax.barh(i, len(test_idx), left=test_idx[0],
height=0.6, color='salmon', label='Test' if i == 0 else '')
ax.set_yticks(np.arange(min(max_splits, len(list(cv.split(df))))))
ax.set_yticklabels([f'Fold {i+1}' for i in range(min(max_splits, len(list(cv.split(df)))))])
ax.set_xlabel('Sample Index')
ax.set_title('Rolling Window Cross-Validation')
ax.legend(loc='upper right')
plt.tight_layout()
plt.show()
# 绘图
plot_rolling_window_splits(df, cv, max_splits=5)4. 扩展窗口验证
4.1 原理
扩展窗口(Expanding Window)每次将新的数据加入训练集,训练集越来越大。
第1折: [====][==]
第2折: [======][==]
第3折: [========][==]
优点:
- 充分利用所有历史数据
- 适合数据积累的场景
缺点:
- 训练时间随折数增加
- 早期数据可能影响模型
4.2 实现
class ExpandingWindowCV:
"""扩展窗口交叉验证"""
def __init__(self, min_train_size=252, test_size=63, step=21):
"""
参数:
min_train_size: 最小训练窗口大小
test_size: 测试窗口大小
step: 滚动步长
"""
self.min_train_size = min_train_size
self.test_size = test_size
self.step = step
def split(self, X, y=None):
"""生成训练/测试索引"""
n_samples = X.shape[0]
# 计算可能的划分数量
n_splits = (n_samples - self.min_train_size - self.test_size) // self.step + 1
if n_splits <= 0:
raise ValueError("数据量不足以进行扩展窗口划分")
for i in range(n_splits):
test_start = self.min_train_size + i * self.step
# 确保测试集不超过数据范围
if test_start + self.test_size > n_samples:
break
test_end = min(test_start + self.test_size, n_samples)
train_idx = np.arange(0, test_start)
test_idx = np.arange(test_start, test_end)
yield train_idx, test_idx
def get_n_splits(self, X=None, y=None):
"""获取划分数量"""
if X is None:
return 0
n_samples = X.shape[0]
return (n_samples - self.min_train_size - self.test_size) // self.step + 1
# 使用示例
cv_expanding = ExpandingWindowCV(min_train_size=500, test_size=100, step=50)
splits = list(cv_expanding.split(df))
print(f"总共 {len(splits)} 折划分")
for i, (train_idx, test_idx) in enumerate(splits[:3]):
print(f"\n第 {i+1} 折:")
print(f" 训练: {len(train_idx)} 样本, 索引 {train_idx[0]}-{train_idx[-1]}")
print(f" 测试: {len(test_idx)} 样本, 索引 {test_idx[0]}-{test_idx[-1]}")4.3 滚动 vs 扩展对比
# 对比两种方法的训练集大小
cv_rolling = RollingWindowCV(train_size=500, test_size=100, step=50)
cv_expanding = ExpandingWindowCV(min_train_size=500, test_size=100, step=50)
print("训练集大小对比:\n")
print("折数 | 滚动窗口 | 扩展窗口")
print("----|----------|----------")
for i in range(min(5, len(list(cv_rolling.split(df))))):
rolling_train = len(list(cv_rolling.split(df))[i][0])
expanding_train = len(list(cv_expanding.split(df))[i][0])
print(f" {i+1:2d} | {rolling_train:4d} | {expanding_train:4d}")5. Walk-Forward 验证
5.1 原理
Walk-Forward 验证是量化回测的黄金标准,模拟真实交易环境:
- 在历史数据上训练模型
- 在未来一段时期测试
- 向前滚动,重复以上过程
训练 | 测试 | 重新训练 | 测试 | 重新训练 | 测试
[====][==] [====][==] [====][==]
5.2 完整实现
class WalkForwardCV:
"""
Walk-Forward 交叉验证
这是最接近实战的验证方法,模拟定期重新训练模型的策略。
"""
def __init__(self, train_size=504, test_size=126, retrain_freq=63):
"""
参数:
train_size: 训练窗口大小(默认约2年交易日)
test_size: 测试窗口大小(默认约半年交易日)
retrain_freq: 重新训练频率(默认约1季度交易日)
"""
self.train_size = train_size
self.test_size = test_size
self.retrain_freq = retrain_freq
def split(self, X, y=None):
"""生成训练/测试索引"""
n_samples = X.shape[0]
# 第一个测试期的开始位置
test_starts = range(self.train_size, n_samples - self.test_size + 1, self.retrain_freq)
for test_start in test_starts:
train_end = test_start
test_end = min(test_start + self.test_size, n_samples)
train_idx = np.arange(max(0, train_end - self.train_size), train_end)
test_idx = np.arange(test_start, test_end)
if len(train_idx) < self.train_size // 2: # 训练数据太少则跳过
continue
yield train_idx, test_idx
def get_n_splits(self, X=None, y=None):
"""获取划分数量"""
if X is None:
return 0
n_samples = X.shape[0]
return len(range(self.train_size, n_samples - self.test_size + 1, self.retrain_freq))
# 使用示例
cv_wf = WalkForwardCV(train_size=500, test_size=100, retrain_freq=50)
splits = list(cv_wf.split(df))
print(f"总共 {len(splits)} 个 Walk-Forward 周期")
for i, (train_idx, test_idx) in enumerate(splits[:3]):
print(f"\n周期 {i+1}:")
print(f" 训练: {df.index[train_idx[0]].date()} 到 {df.index[train_idx[-1]].date()} ({len(train_idx)} 天)")
print(f" 测试: {df.index[test_idx[0]].date()} 到 {df.index[test_idx[-1]].date()} ({len(test_idx)} 天)")5.3 Walk-Forward 回测示例
from sklearn.metrics import mean_squared_error
import lightgbm as lgb
def walk_forward_backtest(df, cv, model_params=None):
"""
Walk-Forward 回测
参数:
df: 包含特征和目标的DataFrame
cv: WalkForwardCV 实例
model_params: 模型参数字典
返回:
results: 包含每期结果的DataFrame
"""
if model_params is None:
model_params = {
'n_estimators': 100,
'max_depth': 4,
'learning_rate': 0.05,
'verbose': -1
}
results = []
for i, (train_idx, test_idx) in enumerate(cv.split(df)):
# 准备数据
X_train = df[['feature']].iloc[train_idx].values
y_train = df['return'].iloc[train_idx].values
X_test = df[['feature']].iloc[test_idx].values
y_test = df['return'].iloc[test_idx].values
# 训练模型
model = lgb.LGBMRegressor(**model_params)
model.fit(X_train, y_train)
# 预测
y_pred = model.predict(X_test)
# 计算指标
mse = mean_squared_error(y_test, y_pred)
ic = np.corrcoef(y_test, y_pred)[0, 1]
results.append({
'period': i + 1,
'train_start': df.index[train_idx[0]].date(),
'train_end': df.index[train_idx[-1]].date(),
'test_start': df.index[test_idx[0]].date(),
'test_end': df.index[test_idx[-1]].date(),
'train_size': len(train_idx),
'test_size': len(test_idx),
'mse': mse,
'ic': ic
})
return pd.DataFrame(results)
# 运行回测
cv_wf = WalkForwardCV(train_size=500, test_size=100, retrain_freq=50)
results_df = walk_forward_backtest(df, cv_wf)
print("Walk-Forward 回测结果:")
print(results_df[['period', 'test_start', 'test_end', 'ic']].head(10))
# 分析整体表现
print(f"\n整体 IC 均值: {results_df['ic'].mean():.4f}")
print(f"整体 IC 标准差: {results_df['ic'].std():.4f}")
print(f"IC > 0 的比例: {(results_df['ic'] > 0).mean():.2%}")6. Purging 和 Embargo
6.1 问题背景
即使按时序划分,仍然可能存在数据泄漏:
- Purging 问题:验证集中的样本可能与训练集中的样本高度相关(时间太近)
- Embargo 问题:训练集末尾的样本可能泄漏验证集开始的信息
6.2 Purging:清除重叠期
从训练集中移除与验证集时间重叠的样本。
class PurgedCV:
"""带清除的时序交叉验证"""
def __init__(self, train_size=500, test_size=100, purge_size=21):
"""
参数:
train_size: 训练窗口大小
test_size: 测试窗口大小
purge_size: 清除期大小(从训练集末尾移除)
"""
self.train_size = train_size
self.test_size = test_size
self.purge_size = purge_size
def split(self, X, y=None):
"""生成训练/测试索引(带清除)"""
n_samples = X.shape[0]
for test_start in range(self.train_size, n_samples - self.test_size + 1, self.test_size):
test_end = min(test_start + self.test_size, n_samples)
# 训练集:移除末尾的清除期
train_end = test_start - self.purge_size
train_start = max(0, train_end - self.train_size)
train_idx = np.arange(train_start, train_end)
test_idx = np.arange(test_start, test_end)
if len(train_idx) < self.train_size // 2:
continue
yield train_idx, test_idx
# 可视化清除效果
cv_purge = PurgedCV(train_size=500, test_size=100, purge_size=21)
fig, ax = plt.subplots(figsize=(12, 2))
for i, (train_idx, test_idx) in enumerate(cv_purge.split(df)):
if i >= 3:
break
# 训练集(蓝色)
ax.barh(i, len(train_idx), left=train_idx[0], height=0.6, color='steelblue')
# 清除期(黄色)
ax.barh(i, cv_purge.purge_size, left=train_idx[-1] + 1, height=0.6, color='gold', label='Purge' if i == 0 else '')
# 测试集(红色)
ax.barh(i, len(test_idx), left=test_idx[0], height=0.6, color='salmon', label='Test' if i == 0 else '')
ax.set_yticks([0, 1, 2])
ax.set_yticklabels(['Fold 1', 'Fold 2', 'Fold 3'])
ax.set_xlabel('Sample Index')
ax.set_title('Purged Cross-Validation (Yellow=Removed from Training)')
ax.legend(loc='upper right')
plt.tight_layout()
plt.show()6.3 Embargo:测试后禁入期
防止测试集的信息被用于下一轮训练。
class EmbargoCV:
"""带禁入期的时序交叉验证"""
def __init__(self, train_size=500, test_size=100, embargo_size=21):
"""
参数:
train_size: 训练窗口大小
test_size: 测试窗口大小
embargo_size: 禁入期大小(测试集之后不可用于训练)
"""
self.train_size = train_size
self.test_size = test_size
self.embargo_size = embargo_size
def split(self, X, y=None):
"""生成训练/测试索引(带禁入期)"""
n_samples = X.shape[0]
test_starts = []
# 计算每个测试期的开始位置(考虑禁入期)
current_test_start = self.train_size
while current_test_start + self.test_size <= n_samples:
test_starts.append(current_test_start)
current_test_start += self.test_size + self.embargo_size
for test_start in test_starts:
test_end = min(test_start + self.test_size, n_samples)
# 训练集:不能包含上一期的测试集和禁入期
train_end = test_start
train_start = max(0, train_end - self.train_size)
train_idx = np.arange(train_start, train_end)
test_idx = np.arange(test_start, test_end)
if len(train_idx) < self.train_size // 2:
continue
yield train_idx, test_idx6.4 组合 Purging + Embargo
class PurgedEmbargoCV:
"""
同时使用 Purging 和 Embargo 的时序交叉验证
这是量化回测的最严格标准
"""
def __init__(self, train_size=500, test_size=100, purge_size=21, embargo_size=21):
"""
参数:
train_size: 训练窗口大小
test_size: 测试窗口大小
purge_size: 从训练集末尾清除的样本数
embargo_size: 测试集后的禁入期样本数
"""
self.train_size = train_size
self.test_size = test_size
self.purge_size = purge_size
self.embargo_size = embargo_size
def split(self, X, y=None):
"""生成训练/测试索引"""
n_samples = X.shape[0]
test_starts = []
current_test_start = self.train_size + self.purge_size
while current_test_start + self.test_size <= n_samples:
test_starts.append(current_test_start)
current_test_start += self.test_size + self.embargo_size
for test_start in test_starts:
test_end = min(test_start + self.test_size, n_samples)
# 训练集:移除末尾的清除期
train_end = test_start - self.purge_size
train_start = max(0, train_end - self.train_size)
train_idx = np.arange(train_start, train_end)
test_idx = np.arange(test_start, test_end)
if len(train_idx) < self.train_size // 2:
continue
yield train_idx, test_idx
def get_n_splits(self, X=None, y=None):
"""获取划分数量"""
if X is None:
return 0
n_samples = X.shape[0]
count = 0
current_test_start = self.train_size + self.purge_size
while current_test_start + self.test_size <= n_samples:
count += 1
current_test_start += self.test_size + self.embargo_size
return count
# 使用示例
cv_pe = PurgedEmbargoCV(train_size=500, test_size=100, purge_size=21, embargo_size=21)
print(f"总共 {cv_pe.get_n_splits(df)} 个划分周期")7. 时序交叉验证封装
7.1 统一的时序 CV 类
from sklearn.base import BaseEstimator, clone
from sklearn.model_selection import BaseCrossValidator
class TimeSeriesCV(BaseCrossValidator):
"""
统一的时序交叉验证类
支持多种划分策略:
- 'rolling': 滚动窗口
- 'expanding': 扩展窗口
- 'walk_forward': Walk-Forward
- 'purged_embargo': 带清除和禁入期的 Walk-Forward
"""
def __init__(self, method='walk_forward', train_size=504, test_size=126,
step=63, purge_size=21, embargo_size=21):
"""
参数:
method: 划分方法
train_size: 训练窗口大小
test_size: 测试窗口大小
step: 滚动步长
purge_size: 清除期大小
embargo_size: 禁入期大小
"""
self.method = method
self.train_size = train_size
self.test_size = test_size
self.step = step
self.purge_size = purge_size
self.embargo_size = embargo_size
def _get_rolling_splits(self, X):
"""滚动窗口划分"""
n_samples = X.shape[0]
for test_start in range(self.train_size, n_samples - self.test_size + 1, self.step):
test_end = min(test_start + self.test_size, n_samples)
train_idx = np.arange(test_start - self.train_size, test_start)
test_idx = np.arange(test_start, test_end)
yield train_idx, test_idx
def _get_expanding_splits(self, X):
"""扩展窗口划分"""
n_samples = X.shape[0]
for test_start in range(self.train_size, n_samples - self.test_size + 1, self.step):
test_end = min(test_start + self.test_size, n_samples)
train_idx = np.arange(0, test_start)
test_idx = np.arange(test_start, test_end)
yield train_idx, test_idx
def _get_walk_forward_splits(self, X):
"""Walk-Forward 划分"""
n_samples = X.shape[0]
for test_start in range(self.train_size, n_samples - self.test_size + 1, self.step):
test_end = min(test_start + self.test_size, n_samples)
train_idx = np.arange(max(0, test_start - self.train_size), test_start)
test_idx = np.arange(test_start, test_end)
yield train_idx, test_idx
def _get_purged_embargo_splits(self, X):
"""带清除和禁入期的划分"""
n_samples = X.shape[0]
current_test_start = self.train_size + self.purge_size
while current_test_start + self.test_size <= n_samples:
test_end = min(current_test_start + self.test_size, n_samples)
train_end = current_test_start - self.purge_size
train_idx = np.arange(max(0, train_end - self.train_size), train_end)
test_idx = np.arange(current_test_start, test_end)
if len(train_idx) >= self.train_size // 2:
yield train_idx, test_idx
current_test_start += self.test_size + self.embargo_size
def split(self, X, y=None, groups=None):
"""生成划分索引"""
if self.method == 'rolling':
return self._get_rolling_splits(X)
elif self.method == 'expanding':
return self._get_expanding_splits(X)
elif self.method == 'walk_forward':
return self._get_walk_forward_splits(X)
elif self.method == 'purged_embargo':
return self._get_purged_embargo_splits(X)
else:
raise ValueError(f"Unknown method: {self.method}")
def get_n_splits(self, X=None, y=None, groups=None):
"""获取划分数量"""
if X is None:
return 0
return len(list(self.split(X)))
# 使用示例
cv = TimeSeriesCV(method='purged_embargo', train_size=500, test_size=100,
step=50, purge_size=21, embargo_size=21)
print(f"方法: {cv.method}")
print(f"划分数量: {cv.get_n_splits(df)}")7.2 与 sklearn 的集成
from sklearn.model_selection import cross_val_score
# 创建时序 CV
cv_ts = TimeSeriesCV(method='walk_forward', train_size=500, test_size=100, step=100)
# 准备数据
X = df[['feature']].values
y = df['return'].values
# 使用 cross_val_score
from sklearn.linear_model import LinearRegression
model = LinearRegression()
# 注意:cross_val_score 会打乱数据,不适合时序
# 我们需要手动实现
def cross_val_score_time_series(model, X, y, cv):
"""时序交叉验证评分"""
scores = []
model = clone(model)
for train_idx, test_idx in cv.split(X):
X_train, X_test = X[train_idx], X[test_idx]
y_train, y_test = y[train_idx], y[test_idx]
model.fit(X_train, y_train)
score = model.score(X_test, y_test)
scores.append(score)
return np.array(scores)
# 使用
scores = cross_val_score_time_series(model, X, y, cv_ts)
print(f"各折 R²: {scores}")
print(f"平均 R²: {scores.mean():.4f} (+/- {scores.std() * 2:.4f})")8. 常见错误示例
8.1 错误1:随机 shuffle
# ❌ 错误:在时序数据上使用 shuffle
from sklearn.model_selection import KFold
kf = KFold(n_splits=5, shuffle=True, random_state=42)
for train_idx, test_idx in kf.split(X):
# 这会导致数据泄漏!
pass
# ✅ 正确:使用时序 CV
cv = TimeSeriesCV(method='walk_forward', train_size=500, test_size=100)
for train_idx, test_idx in cv.split(X):
# 正确的时序划分
pass8.2 错误2:标准化后再划分
# ❌ 错误:全局标准化
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X) # 用了全部数据的统计信息
# 然后才划分...
# ✅ 正确:在 CV 内部标准化
from sklearn.pipeline import Pipeline
pipeline = Pipeline([
('scaler', StandardScaler()),
('model', LinearRegression())
])
for train_idx, test_idx in cv.split(X):
X_train, X_test = X[train_idx], X[test_idx]
pipeline.fit(X_train, y_train) # 只用训练集fit
# pipeline会自动正确处理标准化8.3 错误3:错误的 Purging 实现
# ❌ 错误:只从训练集末尾移除,但没有考虑上一期测试集
def wrong_purge(train_end, test_start):
return train_end - 21 # 只移除21天
# ✅ 正确:确保训练集和测试集之间有足够的间隙
def correct_purge(train_end, test_start, purge_size=21):
# 训练集结束 + 清除期 <= 测试集开始
return min(train_end, test_start - purge_size)8.4 错误4:回测时使用未来信息
# ❌ 错误:在回测中使用全部数据的统计信息
df['rolling_mean'] = df['price'].rolling(20).mean() # 正确
df['rolling_mean'] = df['price'].rolling(20).mean().shift(-1) # 错误!用了未来数据
# ❌ 错误:特征工程使用了目标变量的信息
df['feature'] = df['return'] * 0.5 # 直接用目标构造特征
# ✅ 正确:只用历史信息
df['rolling_mean'] = df['price'].rolling(20).mean()
df['momentum'] = df['price'].pct_change(20)核心知识点总结
划分方法对比
| 方法 | 训练集变化 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|---|
| 简单划分 | 固定 | 快速验证 | 简单 | 单次测试不稳定 |
| 滚动窗口 | 固定大小 | 生产环境 | 分布稳定 | 浪费历史数据 |
| 扩展窗口 | 逐渐增大 | 研究环境 | 利用全部数据 | 训练时间长 |
| Walk-Forward | 模拟实战 | 正式回测 | 最接近实战 | 实现复杂 |
Purging 和 Embargo
| 技术 | 作用 | 典型参数 |
|---|---|---|
| Purging | 移除训练集末尾与测试集重叠的部分 | 21天(约1月) |
| Embargo | 测试集后不可用于下一期训练 | 21天(约1月) |
时序 CV 选择指南
数据量少 (<1000样本) → 简单划分或扩展窗口
数据量中 (1000-10000) → Walk-Forward
数据量大 (>10000) → 滚动窗口
高频数据 (分钟级) → purge_size=1小时, embargo_size=30分钟
日频数据 → purge_size=5天, embargo_size=5天
周/月频数据 → purge_size=1期, embargo_size=1期
关键原则
- 因果性优先:永远用过去预测未来
- 避免泄漏:标准化、特征工程都要在划分后进行
- 模拟实战:Walk-Forward + Purging + Embargo 是最严格的标准
- 充分测试:测试集至少包含一个完整市场周期
下一节: 03-模型训练与调优.md - 学习如何高效训练和调优树模型。