时序数据划分
1. 量化时序数据的特殊性
1.1 因果性约束
核心问题
在量化投资中,数据划分必须严格遵守因果性(Causality)原则:只能使用历史数据预测未来,绝不能使用未来信息。
数学表达
对于时间点 的预测 :
其中 表示 时刻及之前的所有特征,不允许包含 。
违反因果性的典型错误
# 错误示例1:全局标准化
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X) # 使用全部数据计算均值和方差
# 这导致 t 时刻的特征使用了 t+1, t+2, ... 的统计信息
# 问题:scaler.fit使用全部数据的mean和std# 错误示例2:PCA降维
from sklearn.decomposition import PCA
pca = PCA(n_components=5)
X_pca = pca.fit_transform(X) # 使用全部数据计算主成分
# 问题:PCA的协方差矩阵基于全部数据正确做法
# 正确示例1:滚动标准化
from sklearn.preprocessing import StandardScaler
def rolling_standardize(X, window=252):
X_scaled = np.zeros_like(X)
for i in range(window, len(X)):
scaler = StandardScaler()
X_scaled[i] = scaler.fit_transform(X[i-window:i])[-1]
return X_scaled1.2 数据泄露的检测方法
前向泄露
特征计算中使用了未来数据:
检测方法
def detect_lookahead_leakage(X, window=5):
"""
检测特征是否使用了未来信息
方法:检查特征与未来目标的相关性
"""
correlations = []
for lag in range(1, window + 1):
# 计算当前特征与未来lag期目标的相关性
corr = np.corrcoef(X[:-lag], X[lag:])[0, 1]
correlations.append(corr)
# 如果相关性强,可能存在未来信息泄露
if max(correlations) > 0.1:
print(f"警告:检测到潜在的未来信息泄露,最大相关性: {max(correlations):.4f}")
return correlations横截面泄露
使用了同截面其他股票的信息:
检测方法
def detect_cross_section_leakage(X, stock_ids, n_stocks=100):
"""
检测是否存在横截面信息泄露
方法:检查股票间特征的瞬时相关性
"""
t_corr = []
for t in range(X.shape[1]):
# 计算t时刻所有股票间的相关性
Xt = X[:, t, :]
corr_matrix = np.corrcoef(Xt)
# 平均相关性
avg_corr = (np.sum(np.abs(corr_matrix)) - n_stocks) / (n_stocks * (n_stocks - 1))
t_corr.append(avg_corr)
return t_corr2. 时间序列交叉验证方法
2.1 传统K-Fold的局限性
为什么不能用随机K-Fold?
# 错误示例:使用随机K-Fold
from sklearn.model_selection import KFold
kf = KFold(n_splits=5, shuffle=True)
for train_idx, val_idx in kf.split(X):
# 问题:训练集和验证集在时间上交错
# 例如:train包含2023年数据,val包含2022年数据
X_train, X_val = X[train_idx], X[val_idx]问题分析
- 未来信息泄露:验证集可能包含训练集之前的数据
- 回测失真:不符合实际投资场景
- 评估偏差:虚高模型性能
示例说明
时间轴:2018 | 2019 | 2020 | 2021 | 2022 | 2023
随机K-Fold可能产生:
Train: 2018, 2020, 2023
Val: 2019, 2021, 2022
这在实际中不可能!
2.2 时间序列交叉验证(TimeSeriesSplit)
基本原理
from sklearn.model_selection import TimeSeriesSplit
tscv = TimeSeriesSplit(n_splits=5)
for train_idx, val_idx in tscv.split(X):
# 训练集:[t_start, t_train_end]
# 验证集:[t_train_end+1, t_val_end]
X_train, X_val = X[train_idx], X[val_idx]可视化示意
Fold 1: |=== Train ===|== Val ==|.........|
Fold 2: |==== Train ====|=== Val ===|.....|
Fold 3: |===== Train =====|==== Val ====|..|
Fold 4: |====== Train ======|===== Val =====|
Fold 5: |======= Train =======|====== Val ==|
代码实现
import numpy as np
import pandas as pd
from sklearn.model_selection import TimeSeriesSplit
import lightgbm as lgb
def time_series_cv(X, y, params, n_splits=5):
"""
时间序列交叉验证
参数:
X: 特征矩阵,shape=[n_samples, n_features]
y: 目标变量,shape=[n_samples]
params: LightGBM参数
n_splits: 折数
返回:
val_scores: 每折的验证集得分
"""
tscv = TimeSeriesSplit(n_splits=n_splits)
val_scores = []
for fold, (train_idx, val_idx) in enumerate(tscv.split(X)):
print(f"Fold {fold + 1}/{n_splits}")
print(f" Train: {train_idx[0]} - {train_idx[-1]}")
print(f" Val: {val_idx[0]} - {val_idx[-1]}")
# 划分数据
X_train, X_val = X[train_idx], X[val_idx]
y_train, y_val = y[train_idx], y[val_idx]
# 创建数据集
train_data = lgb.Dataset(X_train, label=y_train)
val_data = lgb.Dataset(X_val, label=y_val, reference=train_data)
# 训练模型
model = lgb.train(
params,
train_data,
num_boost_round=1000,
valid_sets=[train_data, val_data],
callbacks=[
lgb.early_stopping(stopping_rounds=50, verbose=False),
lgb.log_evaluation(period=100)
]
)
# 评估
y_pred = model.predict(X_val)
score = np.corrcoef(y_pred, y_val)[0, 1]
val_scores.append(score)
print(f" Validation IC: {score:.4f}")
return val_scores2.3 滚动窗口交叉验证(Rolling Window)
适用场景
当数据分布随时间剧烈变化时,使用固定窗口大小滚动训练。
方法对比
扩展窗口(Expanding Window):
Fold 1: |=== Train ===|== Val ==|.........|
Fold 2: |==== Train ====|=== Val ===|.....|
Fold 3: |===== Train =====|==== Val ====|..|
滚动窗口(Rolling Window):
Fold 1: |=== Train ===|== Val ==|.........|
Fold 2: |...=== Train ===|== Val ==|.....|
Fold 3: |.....=== Train ===|== Val ==|..|
代码实现
def rolling_window_cv(X, y, params, train_size=252, val_size=21, step=21):
"""
滚动窗口交叉验证
参数:
train_size: 训练窗口大小(例如252个交易日≈1年)
val_size: 验证窗口大小(例如21个交易日≈1个月)
step: 滚动步长
返回:
val_scores: 每折的验证集得分
models: 训练的模型列表
"""
val_scores = []
models = []
n_samples = len(X)
start_idx = train_size
fold = 0
while start_idx + val_size <= n_samples:
print(f"Fold {fold + 1}")
# 划分数据
train_start = start_idx - train_size
train_end = start_idx
val_start = start_idx
val_end = start_idx + val_size
print(f" Train: {train_start} - {train_end}")
print(f" Val: {val_start} - {val_end}")
X_train, X_val = X[train_start:train_end], X[val_start:val_end]
y_train, y_val = y[train_start:train_end], y[val_start:val_end]
# 训练模型
train_data = lgb.Dataset(X_train, label=y_train)
val_data = lgb.Dataset(X_val, label=y_val, reference=train_data)
model = lgb.train(
params,
train_data,
num_boost_round=1000,
valid_sets=[train_data, val_data],
callbacks=[
lgb.early_stopping(stopping_rounds=50, verbose=False),
lgb.log_evaluation(period=100)
]
)
# 评估
y_pred = model.predict(X_val)
score = np.corrcoef(y_pred, y_val)[0, 1]
val_scores.append(score)
models.append(model)
print(f" Validation IC: {score:.4f}")
# 滚动窗口
start_idx += step
fold += 1
return val_scores, models在量化中的应用
# 示例:使用滚动窗口验证量化因子
params = {
'objective': 'regression',
'metric': 'rmse',
'num_leaves': 31,
'learning_rate': 0.05,
}
# 训练窗口:1年(252个交易日)
# 验证窗口:1个月(21个交易日)
# 滚动步长:1个月
val_scores, models = rolling_window_cv(
X, y, params,
train_size=252,
val_size=21,
step=21
)
print(f"平均IC: {np.mean(val_scores):.4f}")
print(f"IC标准差: {np.std(val_scores):.4f}")2.4 步进验证(Walk-Forward Validation)
核心思想
模拟实际交易场景,每次验证后,将验证集加入训练集,向前滚动。
与滚动窗口的区别
- 滚动窗口:固定训练窗口大小
- 步进验证:训练窗口逐步扩大
代码实现
def walk_forward_validation(X, y, params, initial_train_size=252, val_size=21, step=21):
"""
步进验证
参数:
initial_train_size: 初始训练窗口大小
val_size: 验证窗口大小
step: 前进步长
返回:
predictions: 所有的预测结果
val_scores: 每折的验证集得分
"""
predictions = []
val_scores = []
models = []
n_samples = len(X)
start_idx = initial_train_size
fold = 0
while start_idx + val_size <= n_samples:
print(f"Fold {fold + 1}")
# 划分数据
train_end = start_idx
val_start = start_idx
val_end = start_idx + val_size
print(f" Train: 0 - {train_end}")
print(f" Val: {val_start} - {val_end}")
X_train, X_val = X[:train_end], X[val_start:val_end]
y_train, y_val = y[:train_end], y[val_start:val_end]
# 训练模型
train_data = lgb.Dataset(X_train, label=y_train)
val_data = lgb.Dataset(X_val, label=y_val, reference=train_data)
model = lgb.train(
params,
train_data,
num_boost_round=1000,
valid_sets=[train_data, val_data],
callbacks=[
lgb.early_stopping(stopping_rounds=50, verbose=False),
lgb.log_evaluation(period=100)
]
)
# 预测
y_pred = model.predict(X_val)
predictions.extend(y_pred)
# 评估
score = np.corrcoef(y_pred, y_val)[0, 1]
val_scores.append(score)
models.append(model)
print(f" Validation IC: {score:.4f}")
# 向前滚动
start_idx += step
fold += 1
return predictions, val_scores, models3. Purging 和 Embargo (进阶)
3.1 为什么还需要额外保护?
问题场景:
假设:
- 训练集最后一天: 2021-12-31
- 验证集第一天: 2022-01-01
- 你的标签是: 未来5日收益
这意味着: 2021-12-31 的标签 = 2022-01-05 的信息
2021-12-31 虽然在训练集,但它的标签包含了验证集时间段的信息!
3.2 Purging (清除)
原理:
删除训练集末尾 N 天,N = 标签预测周期
def train_val_test_split_with_purging(X, y, dates,
train_ratio=0.6,
val_ratio=0.2,
test_ratio=0.2,
horizon=5):
"""
带Purging的数据划分
参数:
X, y: 特征和标签
dates: 日期数组,shape=[n_samples]
train_ratio, val_ratio, test_ratio: 划分比例
horizon: 预测周期(标签使用的未来数据天数)
返回:
(X_train, X_val, X_test), (y_train, y_val, y_test)
"""
assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-6
# 获取唯一日期并排序
unique_dates = np.unique(dates)
n_dates = len(unique_dates)
# 计算划分点
train_end_idx = int(n_dates * train_ratio)
val_end_idx = int(n_dates * (train_ratio + val_ratio))
train_end_date = unique_dates[train_end_idx]
val_end_date = unique_dates[val_end_idx]
print(f"原始划分:")
print(f" 训练集: {unique_dates[0]} ~ {train_end_date}")
print(f" 验证集: {unique_dates[train_end_idx+1]} ~ {val_end_date}")
print(f" 测试集: {unique_dates[val_end_idx+1]} ~ {unique_dates[-1]}")
# Purging: 删除训练集末尾horizon天
purge_start_date = unique_dates[train_end_idx - horizon]
# 生成掩码
train_mask = dates <= purge_start_date
valid_mask = (dates > train_end_date) & (dates <= val_end_date)
test_mask = dates > val_end_date
print(f"\nPurging (删除训练集末尾{horizon}天):")
print(f" 原训练集末尾: {train_end_date}")
print(f" 新训练集末尾: {purge_start_date}")
# 划分数据
X_train, y_train = X[train_mask], y[train_mask]
X_val, y_val = X[valid_mask], y[valid_mask]
X_test, y_test = X[test_mask], y[test_mask]
print(f"\n最终划分:")
print(f" 训练集: {len(X_train)} 样本")
print(f" 验证集: {len(X_val)} 样本")
print(f" 测试集: {len(X_test)} 样本")
return (X_train, X_val, X_test), (y_train, y_val, y_test)
# 使用示例
dates = pd.date_range('2020-01-01', '2023-12-31', freq='D')
X = np.random.randn(len(dates), 10)
y = np.random.randn(len(dates))
(X_train, X_val, X_test), (y_train, y_val, y_test) = train_val_test_split_with_purging(
X, y, dates,
train_ratio=0.6,
val_ratio=0.2,
test_ratio=0.2,
horizon=5
)3.3 Embargo (禁运)
原理:
验证集开头额外空出几天作为缓冲
def train_val_test_split_with_embargo(X, y, dates,
train_ratio=0.6,
val_ratio=0.2,
test_ratio=0.2,
horizon=5,
embargo=3):
"""
带Embargo的数据划分
参数:
X, y: 特征和标签
dates: 日期数组
train_ratio, val_ratio, test_ratio: 划分比例
horizon: 预测周期
embargo: 禁运天数
返回:
(X_train, X_val, X_test), (y_train, y_val, y_test)
"""
assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-6
# 获取唯一日期并排序
unique_dates = np.unique(dates)
n_dates = len(unique_dates)
# 计算划分点
train_end_idx = int(n_dates * train_ratio)
val_end_idx = int(n_dates * (train_ratio + val_ratio))
train_end_date = unique_dates[train_end_idx]
val_end_date = unique_dates[val_end_idx]
# Embargo: 验证集开头额外空出embargo天
embargo_start_date = unique_dates[train_end_idx + embargo]
# 生成掩码
train_mask = dates <= train_end_date
valid_mask = (dates > embargo_start_date) & (dates <= val_end_date)
test_mask = dates > val_end_date
print(f"\nEmbargo (验证集开头空出{embargo}天):")
print(f" 原验证集开头: {unique_dates[train_end_idx+1]}")
print(f" 新验证集开头: {embargo_start_date}")
# 划分数据
X_train, y_train = X[train_mask], y[train_mask]
X_val, y_val = X[valid_mask], y[valid_mask]
X_test, y_test = X[test_mask], y[test_mask]
print(f"\n最终划分:")
print(f" 训练集: {len(X_train)} 样本")
print(f" 验证集: {len(X_val)} 样本 (删除了{embargo}天)")
print(f" 测试集: {len(X_test)} 样本")
return (X_train, X_val, X_test), (y_train, y_val, y_test)
# 使用示例
(X_train, X_val, X_test), (y_train, y_val, y_test) = train_val_test_split_with_embargo(
X, y, dates,
train_ratio=0.6,
val_ratio=0.2,
test_ratio=0.2,
horizon=5,
embargo=3
)3.4 完整的 Purging + Embargo
def train_val_test_split_full(X, y, dates,
train_ratio=0.6,
val_ratio=0.2,
test_ratio=0.2,
horizon=5,
embargo=3):
"""
完整的Purging + Embargo数据划分
时间线:
│ 训练集 │ Purge │ Embargo │ 验证集 │
└──────────┘ ↓ ↓
删除这段 缓冲区
"""
assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-6
# 获取唯一日期并排序
unique_dates = np.unique(dates)
n_dates = len(unique_dates)
# 计算划分点
train_end_idx = int(n_dates * train_ratio)
val_end_idx = int(n_dates * (train_ratio + val_ratio))
# Purging: 删除训练集末尾horizon天
purge_start_date = unique_dates[train_end_idx - horizon]
# Embargo: 验证集开头空出embargo天
embargo_start_date = unique_dates[train_end_idx + embargo]
val_end_date = unique_dates[val_end_idx]
# 生成掩码
train_mask = dates <= purge_start_date
valid_mask = (dates > embargo_start_date) & (dates <= val_end_date)
test_mask = dates > val_end_date
# 划分数据
X_train, y_train = X[train_mask], y[train_mask]
X_val, y_val = X[valid_mask], y[valid_mask]
X_test, y_test = X[test_mask], y[test_mask]
print(f"完整划分 (Purging + Embargo):")
print(f" 训练集: {len(X_train)} 样本 (删除末尾{horizon}天)")
print(f" 验证集: {len(X_val)} 样本 (删除开头{embargo}天)")
print(f" 测试集: {len(X_test)} 样本")
return (X_train, X_val, X_test), (y_train, y_val, y_test)
# 使用示例
(X_train, X_val, X_test), (y_train, y_val, y_test) = train_val_test_split_full(
X, y, dates,
train_ratio=0.6,
val_ratio=0.2,
test_ratio=0.2,
horizon=5,
embargo=3
)3.5 Walk-Forward 验证 (滚动验证)
更真实的验证方式:模拟实际交易场景
def walk_forward_validation_with_purging(X, y, dates, model_class, params,
initial_train_size=252,
val_size=21,
step=21,
horizon=5):
"""
带Purging的Walk-Forward验证
模拟实际交易场景:
Window 1: Train(2020) → Test(2021 Q1)
Window 2: Train(2020~2021 Q1) → Test(2021 Q2)
Window 3: Train(2020~2021 Q2) → Test(2021 Q3)
...
"""
unique_dates = np.unique(dates)
n_dates = len(unique_dates)
val_scores = []
models = []
windows = []
start_idx = initial_train_size
fold = 0
while start_idx + val_size + horizon <= n_dates:
print(f"\n{'='*60}")
print(f"Window {fold + 1}")
print(f"{'='*60}")
# 训练集: 删除末尾horizon天 (Purging)
train_start = 0
train_end = start_idx - horizon
train_end_date = unique_dates[train_end]
# 验证集: 从start_idx开始
val_start = start_idx
val_end = start_idx + val_size
val_start_date = unique_dates[val_start]
val_end_date = unique_dates[val_end]
print(f"训练集: {unique_dates[0]} ~ {train_end_date}")
print(f"验证集: {val_start_date} ~ {val_end_date}")
print(f"Purging: 删除 {unique_dates[train_end+1]} ~ {unique_dates[start_idx-1]} ({horizon}天)")
# 划分数据
train_mask = (dates >= unique_dates[0]) & (dates <= train_end_date)
val_mask = (dates >= val_start_date) & (dates <= val_end_date)
X_train, y_train = X[train_mask], y[train_mask]
X_val, y_val = X[val_mask], y[val_mask]
print(f"训练样本数: {len(X_train)}, 验证样本数: {len(X_val)}")
# 训练模型
model = model_class(**params)
model.fit(X_train, y_train)
# 预测
y_pred = model.predict(X_val)
# 评估
from scipy.stats import pearsonr
ic = pearsonr(y_pred, y_val)[0]
val_scores.append(ic)
models.append(model)
windows.append({
'train_start': unique_dates[0],
'train_end': train_end_date,
'val_start': val_start_date,
'val_end': val_end_date,
'ic': ic
})
print(f"验证IC: {ic:.4f}")
# 向前滚动
start_idx += step
fold += 1
# 统计
print(f"\n{'='*60}")
print("Walk-Forward验证统计")
print(f"{'='*60}")
print(f"窗口数: {len(windows)}")
print(f"平均IC: {np.mean(val_scores):.4f}")
print(f"IC标准差: {np.std(val_scores):.4f}")
print(f"ICIR: {np.mean(val_scores) / np.std(val_scores):.4f}")
print(f"IC胜率: {(np.array(val_scores) > 0).mean():.2%}")
return val_scores, models, windows
# 使用示例
from lightgbm import LGBMRegressor
params = {
'objective': 'regression',
'num_leaves': 31,
'learning_rate': 0.05,
'verbosity': -1,
}
val_scores, models, windows = walk_forward_validation_with_purging(
X, y, dates, LGBMRegressor, params,
initial_train_size=252, # 1年
val_size=21, # 1个月
step=21, # 1个月
horizon=5 # 5天
)
# 绘制IC序列
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 6))
plt.plot([w['ic'] for w in windows], 'o-', linewidth=2, markersize=8)
plt.axhline(y=np.mean(val_scores), color='r', linestyle='--',
label=f'Mean IC: {np.mean(val_scores):.4f}')
plt.axhline(y=0, color='gray', linestyle='-', alpha=0.3)
plt.xlabel('Window')
plt.ylabel('IC')
plt.title('Walk-Forward Validation IC')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()示例输出:
Window 1 (2024-Q1): IC = 0.0536
Window 2 (2024-Q2): IC = 0.0456
Window 3 (2024-Q3): IC = 0.0408
Window 4 (2024-Q4): IC = 0.0305
平均 IC: 0.0426 ± 0.0083
→ 如果 IC 随时间下降,说明模型需要重训练
4. 量化场景下的数据划分策略
4.1 训练集/验证集/测试集划分
推荐比例
时间轴:|======== Train ======|==== Val ====|=== Test ===|
2018-2021 2022 2023
4年 1年 1年
代码实现
def train_val_test_split(X, y, train_ratio=0.6, val_ratio=0.2, test_ratio=0.2):
"""
时序数据的三层划分
参数:
train_ratio: 训练集比例
val_ratio: 验证集比例
test_ratio: 测试集比例
返回:
(X_train, X_val, X_test), (y_train, y_val, y_test)
"""
assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-6
n_samples = len(X)
train_end = int(n_samples * train_ratio)
val_end = int(n_samples * (train_ratio + val_ratio))
X_train, X_val, X_test = X[:train_end], X[train_end:val_end], X[val_end:]
y_train, y_val, y_test = y[:train_end], y[train_end:val_end], y[val_end:]
return (X_train, X_val, X_test), (y_train, y_val, y_test)实际应用
# 示例:4年训练,1年验证,1年测试
(X_train, X_val, X_test), (y_train, y_val, y_test) = train_val_test_split(
X, y,
train_ratio=4/6,
val_ratio=1/6,
test_ratio=1/6
)
print(f"Train: {len(X_train)} samples")
print(f"Val: {len(X_val)} samples")
print(f"Test: {len(X_test)} samples")3.2 多市场周期的划分
为什么要考虑市场周期?
不同市场状态(牛市/熊市/震荡市)下,因子表现差异巨大。
识别市场状态
def identify_market_regime(prices, window=252):
"""
识别市场状态
参数:
prices: 价格序列,shape=[n_samples]
window: 滚动窗口大小
返回:
regimes: 市场状态标签(0=震荡, 1=牛市, -1=熊市)
"""
# 计算收益率
returns = prices.pct_change().dropna()
# 计算滚动趋势和波动率
trend = returns.rolling(window).mean()
volatility = returns.rolling(window).std()
# 定义阈值
trend_threshold = trend.mean() + 0.5 * volatility.mean()
volatility_threshold = volatility.mean()
regimes = np.zeros(len(prices))
for i in range(window, len(prices)):
if trend[i] > trend_threshold:
regimes[i] = 1 # 牛市
elif trend[i] < -trend_threshold:
regimes[i] = -1 # 熊市
else:
regimes[i] = 0 # 震荡市
return regimes按市场状态划分数据
def split_by_regime(X, y, regimes):
"""
按市场状态划分数据
返回:
dict: {regime: (X_regime, y_regime)}
"""
regime_splits = {}
for regime in [-1, 0, 1]:
mask = regimes == regime
regime_splits[regime] = (X[mask], y[mask])
return regime_splits
# 使用示例
regime_splits = split_by_regime(X, y, regimes)
for regime, (X_regime, y_regime) in regime_splits.items():
regime_name = {1: '牛市', 0: '震荡市', -1: '熊市'}[regime]
print(f"{regime_name}: {len(X_regime)} samples")平衡不同市场状态的样本
def balance_regime_samples(X, y, regimes, target_ratio=0.3):
"""
平衡不同市场状态的样本比例
参数:
target_ratio: 牛市和熊市的目标比例
返回:
(X_balanced, y_balanced)
"""
# 计算当前比例
bull_ratio = np.sum(regimes == 1) / len(regimes)
bear_ratio = np.sum(regimes == -1) / len(regimes)
# 计算需要采样/重复的次数
bull_factor = target_ratio / bull_ratio if bull_ratio > 0 else 0
bear_factor = target_ratio / bear_ratio if bear_ratio > 0 else 0
X_balanced, y_balanced = [], []
for regime in [-1, 0, 1]:
mask = regimes == regime
X_regime, y_regime = X[mask], y[mask]
if regime == 1:
factor = bull_factor
elif regime == -1:
factor = bear_factor
else:
factor = 1.0
# 采样或重复
n_samples = int(len(X_regime) * factor)
if factor > 1:
# 重复采样
indices = np.random.choice(len(X_regime), n_samples, replace=True)
else:
# 随机采样
indices = np.random.choice(len(X_regime), n_samples, replace=False)
X_balanced.append(X_regime[indices])
y_balanced.append(y_regime[indices])
X_balanced = np.vstack(X_balanced)
y_balanced = np.hstack(y_balanced)
return X_balanced, y_balanced3.3 按行业划分
行业异质性
不同行业的股票特征和收益模式差异显著,需要行业划分。
代码实现
def split_by_industry(X, y, industry_codes):
"""
按行业划分数据
参数:
industry_codes: 行业代码,shape=[n_samples]
返回:
dict: {industry_code: (X_industry, y_industry)}
"""
industry_splits = {}
for code in np.unique(industry_codes):
mask = industry_codes == code
industry_splits[code] = (X[mask], y[mask])
return industry_splits行业中性化
def industry_neutralize(X, y, industry_codes):
"""
行业中性化:消除行业效应
方法:对每个行业进行标准化
"""
X_neutral = np.zeros_like(X)
y_neutral = np.zeros_like(y)
for code in np.unique(industry_codes):
mask = industry_codes == code
# 标准化特征
X_industry = X[mask]
X_mean = X_industry.mean(axis=0)
X_std = X_industry.std(axis=0)
X_neutral[mask] = (X_industry - X_mean) / (X_std + 1e-8)
# 标准化目标
y_industry = y[mask]
y_mean = y_industry.mean()
y_std = y_industry.std()
y_neutral[mask] = (y_industry - y_mean) / (y_std + 1e-8)
return X_neutral, y_neutral4. 数据划分的最佳实践
4.1 完整的数据划分流程
class QuantDataSplitter:
"""
量化数据划分器
功能:
1. 时间序列划分
2. 市场状态识别
3. 行业中性化
4. 数据验证
"""
def __init__(self, prices, industry_codes=None):
self.prices = prices
self.industry_codes = industry_codes
def identify_regimes(self, window=252):
"""识别市场状态"""
returns = self.prices.pct_change().dropna()
trend = returns.rolling(window).mean()
volatility = returns.rolling(window).std()
trend_threshold = trend.mean() + 0.5 * volatility.mean()
regimes = np.zeros(len(self.prices))
for i in range(window, len(self.prices)):
if trend[i] > trend_threshold:
regimes[i] = 1
elif trend[i] < -trend_threshold:
regimes[i] = -1
else:
regimes[i] = 0
self.regimes = regimes
return regimes
def train_val_test_split(self, X, y, train_ratio=0.6, val_ratio=0.2, test_ratio=0.2):
"""三层划分"""
assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-6
n_samples = len(X)
train_end = int(n_samples * train_ratio)
val_end = int(n_samples * (train_ratio + val_ratio))
splits = {
'train': (X[:train_end], y[:train_end]),
'val': (X[train_end:val_end], y[train_end:val_end]),
'test': (X[val_end:], y[val_end:])
}
return splits
def validate_no_leakage(self, X_train, X_val):
"""验证无信息泄露"""
train_mean = X_train.mean(axis=0)
train_std = X_train.std(axis=0)
val_mean = X_val.mean(axis=0)
# 检查均值差异
mean_diff = np.abs(train_mean - val_mean) / (train_std + 1e-8)
if np.any(mean_diff > 3):
print("警告:训练集和验证集分布差异较大")
print(f"最大标准化差异: {np.max(mean_diff):.4f}")
return mean_diff
def split(self, X, y, neutralize_industry=True):
"""完整的划分流程"""
# 1. 识别市场状态
self.identify_regimes()
# 2. 行业中性化
if self.industry_codes is not None and neutralize_industry:
X, y = self.industry_neutralize(X, y)
# 3. 三层划分
splits = self.train_val_test_split(X, y)
# 4. 验证无信息泄露
X_train, X_val = splits['train'][0], splits['val'][0]
self.validate_no_leakage(X_train, X_val)
return splits使用示例
# 创建划分器
splitter = QuantDataSplitter(prices, industry_codes)
# 划分数据
splits = splitter.split(X, y)
# 提取各部分数据
X_train, y_train = splits['train']
X_val, y_val = splits['val']
X_test, y_test = splits['test']
# 打印统计信息
print(f"Train: {len(X_train)} samples")
print(f"Val: {len(X_val)} samples")
print(f"Test: {len(X_test)} samples")4.2 数据划分检查清单
划分前检查
- 数据按时间排序
- 移除未来信息泄露
- 确认无NaN值或合理处理
- 特征标准化在划分后进行
- 行业/市场状态信息完整
划分后检查
- 训练集、验证集、测试集按时间顺序排列
- 验证集和测试集严格在训练集之后
- 检查分布差异(均值、方差)
- 确认目标变量分布合理
- 记录划分比例和时间范围
评估指标检查
- 训练集、验证集、测试集分别评估
- 检查IC/Rank IC在不同子集上的稳定性
- 分析过拟合程度
- 检查样本外性能衰减
5. 总结
时序数据划分是量化投资中至关重要的环节,核心原则是严格遵守因果性约束。主要方法包括:
- 时间序列交叉验证:保证训练集严格在验证集之前
- 滚动窗口验证:适应数据分布随时间变化
- 步进验证:模拟实际交易场景
- 多维度划分:考虑市场状态、行业等因素
正确的数据划分是构建稳健量化模型的基础,必须在模型开发初期就建立严格的划分规范。