04-时序数据处理
本节介绍量化时序数据的处理方法,包括滑动窗口、标准化、Dataset 构建和数据划分等核心技巧。
滑动窗口(Sliding Window)
原理
滑动窗口是时序建模的核心技术,将一维时间序列转换为监督学习样本。
滑动窗口示意图:
原始序列:
t=1 t=2 t=3 t=4 t=5 t=6 t=7 t=8
[A] [B] [C] [D] [E] [F] [G] [H]
seq_len=3, horizon=1 的滑动窗口:
样本1: [A] [B] [C] → [D]
└───┬───┘ └┬┘
输入(X) 标签(Y)
样本2: [B] [C] [D] → [E]
└───┬───┘ └┬┘
输入(X) 标签(Y)
样本3: [C] [D] [E] → [F]
└───┬───┘ └┬┘
输入(X) 标签(Y)
...
窗口向右滑动,每次移动 1 步
单步预测 vs 多步预测
import numpy as np
def create_sliding_windows(data, seq_len, horizon=1):
"""
创建滑动窗口样本
Args:
data: numpy array, shape (n_samples, n_features)
seq_len: 输入序列长度(用多少历史预测)
horizon: 预测未来多少步
Returns:
X: shape (n_samples, seq_len, n_features)
y: shape (n_samples, horizon, n_features) 或 (n_samples, n_features)
"""
X, y = [], []
for i in range(len(data) - seq_len - horizon + 1):
# 输入:从 i 到 i+seq_len-1
X.append(data[i:i + seq_len])
# 标签:从 i+seq_len 到 i+seq_len+horizon-1
y.append(data[i + seq_len:i + seq_len + horizon])
X = np.array(X)
y = np.array(y)
# 如果 horizon=1,去掉中间维度
if horizon == 1:
y = y.squeeze(1)
return X, y
# 示例:单步预测
data = np.arange(1, 11).reshape(-1, 1) # 1 到 10
X, y = create_sliding_windows(data, seq_len=3, horizon=1)
print("单步预测 (horizon=1):")
print(f"数据: {data.flatten()}")
print(f"X 形状: {X.shape}, y 形状: {y.shape}")
for i in range(len(X)):
print(f"样本 {i+1}: 输入 {X[i].flatten()} → 预测 {y[i].flatten()}")
# 示例:多步预测
X, y = create_sliding_windows(data, seq_len=3, horizon=2)
print("\n多步预测 (horizon=2):")
print(f"X 形状: {X.shape}, y 形状: {y.shape}")
for i in range(len(X)):
print(f"样本 {i+1}: 输入 {X[i].flatten()} → 预测 {y[i].flatten()}")seq_len 和 horizon 的选择
| 参数 | 含义 | 选择建议 | 典型值 |
|---|---|---|---|
seq_len | 用多少历史预测 | 太短→信息不足,太长→噪声和梯度问题 | 20-60(日频) |
horizon | 预测未来几步 | 1步(默认),多步误差累积 | 1-5 |
# 不同 seq_len 的效果对比
import numpy as np
# 生成带趋势和噪声的数据
np.random.seed(42)
t = np.linspace(0, 10, 100)
data = 2 * t + np.random.randn(100) * 0.5
seq_lengths = [5, 10, 20, 40]
for seq_len in seq_lengths:
X, y = create_sliding_windows(data.reshape(-1, 1), seq_len, horizon=1)
print(f"seq_len={seq_len}: 样本数={len(X)}, 信息比例={seq_len/len(data)*100:.1f}%")数据标准化
为什么需要标准化
未标准化的数据问题:
特征1: [0.001, 0.002, 0.003, ...] (收益率,很小)
特征2: [1000000, 1000050, ...] (市值,很大)
问题:
1. 大数值特征主导梯度更新
2. 损失函数 landscape 不规则
3. 优化困难,收敛慢
标准化后:
所有特征都在相似尺度(均值0,标准差1)
梯度更新更平衡
Z-score 标准化
import numpy as np
def zscore_normalize(data, mean=None, std=None):
"""
Z-score 标准化: (x - mean) / std
Args:
data: numpy array
mean: 预计算的均值(用于测试集)
std: 预计算的标准差(用于测试集)
Returns:
normalized_data, mean, std
"""
if mean is None:
mean = np.mean(data, axis=0)
if std is None:
std = np.std(data, axis=0)
# 避免除以0
std = np.where(std == 0, 1, std)
normalized = (data - mean) / std
return normalized, mean, std
# 示例
data = np.array([[1, 100], [2, 200], [3, 300], [4, 400]], dtype=float)
normalized, mean, std = zscore_normalize(data)
print("原始数据:")
print(data)
print(f"\n均值: {mean}")
print(f"标准差: {std}")
print("\n标准化后:")
print(normalized)
print(f"\n验证: 均值≈{np.mean(normalized, axis=0)}, 标准差≈{np.std(normalized, axis=0)}")Min-Max 标准化
def minmax_normalize(data, min_val=None, max_val=None, feature_range=(0, 1)):
"""
Min-Max 标准化: (x - min) / (max - min) * (new_max - new_min) + new_min
Args:
data: numpy array
min_val: 预计算的最小值
max_val: 预计算的最大值
feature_range: 目标范围,默认 (0, 1)
Returns:
normalized_data, min_val, max_val
"""
if min_val is None:
min_val = np.min(data, axis=0)
if max_val is None:
max_val = np.max(data, axis=0)
# 避免除以0
range_val = max_val - min_val
range_val = np.where(range_val == 0, 1, range_val)
# 标准化到 [0, 1]
normalized = (data - min_val) / range_val
# 缩放到目标范围
new_min, new_max = feature_range
normalized = normalized * (new_max - new_min) + new_min
return normalized, min_val, max_val
# 示例
data = np.array([[1, 100], [2, 200], [3, 300], [4, 400]], dtype=float)
normalized, min_val, max_val = minmax_normalize(data)
print("原始数据:")
print(data)
print(f"\n最小值: {min_val}")
print(f"最大值: {max_val}")
print("\n标准化后 [0, 1]:")
print(normalized)标准化方法对比
| 方法 | 公式 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|---|
| Z-score | 数据分布近似正态 | 保留离群点信息 | 对离群值敏感 | |
| Min-Max | 需要固定范围 | 简单直观 | 对离群值极敏感 | |
| Robust | 有离群值 | 抗离群值 | 分布改变大 |
⚠️ 标准化泄漏问题
# ❌ 错误做法:全样本标准化
def normalize_wrong(data):
"""错误:用全样本统计量标准化"""
mean = np.mean(data, axis=0)
std = np.std(data, axis=0)
return (data - mean) / std
# 问题:模型"见过"测试集的分布
# ✅ 正确做法:只用训练集统计量
def normalize_correct(train_data, val_data, test_data):
"""正确:用训练集统计量标准化所有数据"""
# 只用训练集计算统计量
train_mean = np.mean(train_data, axis=0)
train_std = np.std(train_data, axis=0)
# 用训练集统计量标准化所有数据
train_normalized = (train_data - train_mean) / train_std
val_normalized = (val_data - train_mean) / train_std
test_normalized = (test_data - train_mean) / train_std
return train_normalized, val_normalized, test_normalized
# 示例
np.random.seed(42)
train = np.random.randn(100, 3) # 训练集
val = np.random.randn(20, 3) + 0.1 # 验证集(偏移)
test = np.random.randn(20, 3) + 0.2 # 测试集(更偏移)
train_norm, val_norm, test_norm = normalize_correct(train, val, test)
print("训练集统计量(用于标准化):")
print(f"均值: {np.mean(train, axis=0)}")
print(f"标准差: {np.std(train, axis=0)}")
print("\n标准化后的均值(应该接近训练集的均值):")
print(f"训练集: {np.mean(train_norm, axis=0)}")
print(f"验证集: {np.mean(val_norm, axis=0)}")
print(f"测试集: {np.mean(test_norm, axis=0)}")量化特有:横截面标准化
在量化中,每期内部进行标准化(横截面中性化):
def cross_sectional_normalize(data):
"""
横截面 Z-score 标准化
对每个时间步,在该时间截面上标准化所有股票
Args:
data: shape (n_dates, n_stocks, n_features)
Returns:
normalized_data
"""
normalized = np.zeros_like(data)
for t in range(data.shape[0]): # 遍历每个时间点
for f in range(data.shape[2]): # 遍历每个特征
# 获取该时间点该特征的所有股票值
截面 = data[t, :, f]
# 计算该截面的均值和标准差
mean = np.mean(截面)
std = np.std(截面)
# 标准化
if std > 0:
normalized[t, :, f] = (截面 - mean) / std
else:
normalized[t, :, f] = 0
return normalized
# 示例:3天 × 5只股票 × 2个特征
np.random.seed(42)
data = np.random.randn(3, 5, 2) * 10 + 100 # 模拟价格数据
print("原始数据(第1天):")
print(data[0])
normalized = cross_sectional_normalize(data)
print("\n横截面标准化后(第1天):")
print(normalized[0])
print(f"\n验证:每期每特征的均值≈0: {np.mean(normalized[0], axis=0)}")Rolling 标准化
对于非平稳数据,使用滚动窗口标准化:
def rolling_normalize(data, window=20):
"""
滚动窗口标准化
对每个样本,用前 window 个样本的统计量标准化
Args:
data: shape (n_samples, n_features)
window: 滚动窗口大小
Returns:
normalized_data
"""
normalized = np.zeros_like(data)
for i in range(len(data)):
if i < window:
# 数据不足,用累积统计量
mean = np.mean(data[:i+1], axis=0)
std = np.std(data[:i+1], axis=0)
else:
# 用前 window 个样本
mean = np.mean(data[i-window:i], axis=0)
std = np.std(data[i-window:i], axis=0)
std = np.where(std == 0, 1, std)
normalized[i] = (data[i] - mean) / std
return normalized
# 示例:带趋势的数据
np.random.seed(42)
t = np.linspace(0, 10, 100)
data = 2 * t.reshape(-1, 1) + np.random.randn(100, 1) * 0.5
normalized = rolling_normalize(data, window=20)
import matplotlib.pyplot as plt
fig, axes = plt.subplots(2, 1, figsize=(12, 6))
axes[0].plot(data)
axes[0].set_title('原始数据(带趋势)')
axes[0].grid(True)
axes[1].plot(normalized)
axes[1].set_title('Rolling 标准化后')
axes[1].grid(True)
plt.tight_layout()
plt.show()PyTorch Dataset 实现
完整的 TimeSeriesDataset
import torch
from torch.utils.data import Dataset
import numpy as np
class TimeSeriesDataset(Dataset):
"""
时序数据集类
支持多特征、单标签、多步预测
"""
def __init__(
self,
data,
seq_len,
horizon=1,
target_col=None, # None 表示用所有特征预测,或指定特征索引
transform=None
):
"""
Args:
data: numpy array, shape (n_samples, n_features) 或 pandas DataFrame
seq_len: 输入序列长度
horizon: 预测未来多少步
target_col: 目标列索引,None 表示使用所有特征
transform: 数据变换函数
"""
self.data = torch.tensor(data, dtype=torch.float32)
self.seq_len = seq_len
self.horizon = horizon
self.target_col = target_col
self.transform = transform
def __len__(self):
return len(self.data) - self.seq_len - self.horizon + 1
def __getitem__(self, idx):
# 输入序列
x = self.data[idx:idx + self.seq_len]
# 标签
if self.target_col is not None:
# 只用特定特征作为标签
y = self.data[idx + self.seq_len:idx + self.seq_len + self.horizon, self.target_col]
else:
# 用所有特征作为标签
y = self.data[idx + self.seq_len:idx + self.seq_len + self.horizon]
# 如果 horizon=1,去掉中间维度
if self.horizon == 1 and len(y.shape) > 1:
y = y.squeeze(1)
# 应用变换
if self.transform:
x = self.transform(x)
return x, y
# 使用示例
np.random.seed(42)
data = np.random.randn(500, 10) # 500个样本,10个特征
# 创建数据集(用前20天预测第21天的第0个特征)
dataset = TimeSeriesDataset(
data=data,
seq_len=20,
horizon=1,
target_col=0 # 预测第0个特征(如收益率)
)
print(f"数据集大小: {len(dataset)}")
# 获取第一个样本
x, y = dataset[0]
print(f"输入形状: {x.shape}") # (20, 10)
print(f"标签形状: {y.shape}") # () 标量
# 获取一个多步预测样本
dataset_multi = TimeSeriesDataset(
data=data,
seq_len=20,
horizon=5, # 预测未来5步
target_col=0
)
x, y = dataset_multi[0]
print(f"\n多步预测 - 输入形状: {x.shape}") # (20, 10)
print(f"多步预测 - 标签形状: {y.shape}") # (5,) 5个时间步支持多特征单标签的 Dataset
class FinancialDataset(Dataset):
"""
量化专用数据集
特征:多个技术指标
标签:未来收益率(单列)
"""
def __init__(
self,
features, # shape: (n_dates, n_stocks, n_features)
labels, # shape: (n_dates, n_stocks)
seq_len=20,
horizon=1
):
"""
Args:
features: 多维特征数组
labels: 标签数组(未来收益率)
seq_len: 序列长度
horizon: 预测步数
"""
self.features = torch.tensor(features, dtype=torch.float32)
self.labels = torch.tensor(labels, dtype=torch.float32)
self.seq_len = seq_len
self.horizon = horizon
# 检查维度
assert len(features.shape) == 3, "features 应该是 (n_dates, n_stocks, n_features)"
assert len(labels.shape) == 2, "labels 应该是 (n_dates, n_stocks)"
def __len__(self):
return self.features.shape[0] - self.seq_len - self.horizon + 1
def __getitem__(self, idx):
# 特征序列
x = self.features[idx:idx + self.seq_len] # (seq_len, n_stocks, n_features)
# 标签
if self.horizon == 1:
y = self.labels[idx + self.seq_len] # (n_stocks,)
else:
y = self.labels[idx + self.seq_len:idx + self.seq_len + self.horizon] # (horizon, n_stocks)
return x, y
# 示例:模拟量化数据
np.random.seed(42)
n_dates = 500
n_stocks = 100
n_features = 10
# 特征:(日期, 股票, 特征)
features = np.random.randn(n_dates, n_stocks, n_features)
# 标签:(日期, 股票) 未来收益率
labels = np.random.randn(n_dates, n_stocks) * 0.02
dataset = FinancialDataset(features, labels, seq_len=20, horizon=1)
x, y = dataset[0]
print(f"输入形状: {x.shape}") # (20, 100, 10)
print(f"标签形状: {y.shape}") # (100,)DataLoader 配置
基本配置
from torch.utils.data import DataLoader
# 创建 DataLoader
dataloader = DataLoader(
dataset,
batch_size=32, # 批次大小
shuffle=False, # 时序数据不打乱!
num_workers=0, # 数据加载进程数
pin_memory=False, # 是否锁页内存
drop_last=False # 是否丢弃最后不完整的批次
)
# 遍历
for batch_idx, (x_batch, y_batch) in enumerate(dataloader):
print(f"批次 {batch_idx}: X={x_batch.shape}, y={y_batch.shape}")
if batch_idx >= 2:
breakDataLoader 参数详解
| 参数 | 说明 | 量化场景建议 |
|---|---|---|
batch_size | 批次大小 | 32-256,取决于内存 |
shuffle | 是否打乱 | False(时序因果性) |
num_workers | 加载进程数 | 0(Windows)/ 4(Linux) |
pin_memory | 锁页内存 | True(GPU 训练) |
drop_last | 丢弃末批次 | True(训练)/ False(验证) |
为什么 shuffle=False?
# ❌ 错误:shuffle=True
# 问题:破坏时序因果性
# 训练时可能用 t=100 预测 t=50
# ✅ 正确:shuffle=False
# 保持时间顺序,确保因果性数据划分
时序划分
import numpy as np
def time_series_split(data, train_ratio=0.7, val_ratio=0.15):
"""
时序数据划分
按时间顺序划分训练集、验证集、测试集
Args:
data: numpy array, shape (n_samples, n_features)
train_ratio: 训练集比例
val_ratio: 验证集比例
Returns:
train, val, test
"""
n = len(data)
train_end = int(n * train_ratio)
val_end = int(n * (train_ratio + val_ratio))
train = data[:train_end]
val = data[train_end:val_end]
test = data[val_end:]
print(f"数据划分:")
print(f" 训练集: {len(train)} 样本 ({train_ratio*100:.0f}%)")
print(f" 验证集: {len(val)} 样本 ({val_ratio*100:.0f}%)")
print(f" 测试集: {len(test)} 样本 ({(1-train_ratio-val_ratio)*100:.0f}%)")
return train, val, test
# 示例
np.random.seed(42)
data = np.random.randn(1000, 10)
train, val, test = time_series_split(data, train_ratio=0.7, val_ratio=0.15)
# 可视化
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 3))
plt.axvspan(0, len(train), alpha=0.3, color='green', label='Train')
plt.axvspan(len(train), len(train)+len(val), alpha=0.3, color='yellow', label='Val')
plt.axvspan(len(train)+len(val), len(data), alpha=0.3, color='red', label='Test')
plt.legend()
plt.title('时序数据划分')
plt.show()滚动划分(Rolling Split)
用于模拟实盘滚动训练:
def rolling_time_series_split(data, n_splits=5, train_size=0.6, val_size=0.2):
"""
滚动时序划分
生成多个训练-验证-测试划分,模拟滚动重训练
Args:
data: numpy array
n_splits: 划分数量
train_size: 训练集大小(比例)
val_size: 验证集大小(比例)
Yields:
(train, val, test) 元组
"""
n = len(data)
train_len = int(n * train_size)
val_len = int(n * val_size)
# 滚动起点
step = (n - train_len - val_len) // n_splits
for i in range(n_splits):
start = i * step
train_end = start + train_len
val_end = train_end + val_len
train = data[start:train_end]
val = data[train_end:val_end]
test = data[val_end:val_end + step] # 测试集大小等于滚动步长
yield train, val, test
# 示例
np.random.seed(42)
data = np.random.randn(1000, 10)
for fold, (train, val, test) in enumerate(rolling_time_series_split(data, n_splits=3)):
print(f"\nFold {fold + 1}:")
print(f" 训练: {train.shape[0]}, 验证: {val.shape[0]}, 测试: {test.shape[0]}")
print(f" 日期范围: 训练[0:{train.shape[0]}], 验证[{train.shape[0]}:{train.shape[0]+val.shape[0]}]")完整数据准备流水线
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
class QuantDataPipeline:
"""量化数据准备流水线"""
def __init__(
self,
data,
seq_len=20,
horizon=1,
target_col=0,
train_ratio=0.7,
val_ratio=0.15,
batch_size=32,
normalize_method='zscore'
):
"""
Args:
data: numpy array, shape (n_samples, n_features)
seq_len: 序列长度
horizon: 预测步数
target_col: 目标列索引
train_ratio: 训练集比例
val_ratio: 验证集比例
batch_size: 批次大小
normalize_method: 标准化方法 ('zscore', 'minmax', 'robust')
"""
self.seq_len = seq_len
self.horizon = horizon
self.target_col = target_col
self.batch_size = batch_size
self.normalize_method = normalize_method
# 划分数据
self.train, self.val, self.test = self._split_data(data, train_ratio, val_ratio)
# 标准化
self.train_norm, self.val_norm, self.test_norm = self._normalize()
# 创建 Dataset 和 DataLoader
self.train_loader, self.val_loader, self.test_loader = self._create_loaders()
def _split_data(self, data, train_ratio, val_ratio):
"""时序划分"""
n = len(data)
train_end = int(n * train_ratio)
val_end = int(n * (train_ratio + val_ratio))
return data[:train_end], data[train_end:val_end], data[val_end:]
def _normalize(self):
"""标准化"""
if self.normalize_method == 'zscore':
# 只用训练集统计量
mean = np.mean(self.train, axis=0)
std = np.std(self.train, axis=0)
std = np.where(std == 0, 1, std)
train_norm = (self.train - mean) / std
val_norm = (self.val - mean) / std
test_norm = (self.test - mean) / std
elif self.normalize_method == 'minmax':
min_val = np.min(self.train, axis=0)
max_val = np.max(self.train, axis=0)
range_val = max_val - min_val
range_val = np.where(range_val == 0, 1, range_val)
train_norm = (self.train - min_val) / range_val
val_norm = (self.val - min_val) / range_val
test_norm = (self.test - min_val) / range_val
else:
train_norm, val_norm, test_norm = self.train, self.val, self.test
return train_norm, val_norm, test_norm
def _create_loaders(self):
"""创建 DataLoader"""
train_dataset = TimeSeriesDataset(
self.train_norm, self.seq_len, self.horizon, self.target_col
)
val_dataset = TimeSeriesDataset(
self.val_norm, self.seq_len, self.horizon, self.target_col
)
test_dataset = TimeSeriesDataset(
self.test_norm, self.seq_len, self.horizon, self.target_col
)
train_loader = DataLoader(
train_dataset, batch_size=self.batch_size, shuffle=False
)
val_loader = DataLoader(
val_dataset, batch_size=self.batch_size, shuffle=False
)
test_loader = DataLoader(
test_dataset, batch_size=self.batch_size, shuffle=False
)
return train_loader, val_loader, test_loader
# 使用示例
np.random.seed(42)
data = np.random.randn(1000, 10)
pipeline = QuantDataPipeline(
data=data,
seq_len=20,
horizon=1,
target_col=0,
train_ratio=0.7,
val_ratio=0.15,
batch_size=32,
normalize_method='zscore'
)
print("数据流水线创建完成!")
print(f"\n训练批次数: {len(pipeline.train_loader)}")
print(f"验证批次数: {len(pipeline.val_loader)}")
print(f"测试批次数: {len(pipeline.test_loader)}")
# 获取一个批次
for x_batch, y_batch in pipeline.train_loader:
print(f"\n批次形状: X={x_batch.shape}, y={y_batch.shape}")
break数据增强(简单介绍)
加噪声
def add_noise(x, noise_level=0.01):
"""
添加高斯噪声
Args:
x: 输入数据
noise_level: 噪声标准差比例
Returns:
添加噪声后的数据
"""
noise = torch.randn_like(x) * noise_level
return x + noise
# 示例
x = torch.randn(10, 5)
x_noisy = add_noise(x, noise_level=0.1)
print(f"原始数据标准差: {x.std():.4f}")
print(f"加噪声后标准差: {x_noisy.std():.4f}")时间反转(注意因果性)
# ⚠️ 警告:时序预测中慎用时间反转!
# 问题:破坏因果性
# 适用场景:
# 1. 分类任务(不是预测)
# 2. 数据增强用于预训练
def time_reverse(x):
"""时间维度反转"""
return torch.flip(x, dims=[1]) # 反转 seq_len 维度
# 使用建议:
# ❌ 预测任务:不要用
# ✅ 表示学习/预训练:可以使用核心知识点总结
1. 滑动窗口
# 单步预测
X = data[i:i+seq_len]
y = data[i+seq_len]
# 多步预测
y = data[i+seq_len:i+seq_len+horizon]2. 标准化
# ⚠️ 只用训练集统计量
mean = np.mean(train_data, axis=0)
std = np.std(train_data, axis=0)
train_norm = (train_data - mean) / std
val_norm = (val_data - mean) / std3. 时序划分
# 按时间顺序,不打乱
train = data[:train_end]
val = data[train_end:val_end]
test = data[val_end:]4. DataLoader
# shuffle=False 保持时序
dataloader = DataLoader(
dataset,
batch_size=32,
shuffle=False # 时序数据不打乱
)练习建议
- 实现滚动窗口:手动实现一个带滚动窗口的 Dataset
- 对比标准化方法:用不同标准化方法训练模型,比较效果
- 可视化数据流:绘制数据从原始到训练的完整流程图
下一节
在 05-模型训练优化.md 中,我们将学习如何训练和优化深度学习模型。