04-时序数据处理

本节介绍量化时序数据的处理方法,包括滑动窗口、标准化、Dataset 构建和数据划分等核心技巧。

滑动窗口(Sliding Window)

原理

滑动窗口是时序建模的核心技术,将一维时间序列转换为监督学习样本。

滑动窗口示意图:

原始序列:
    t=1    t=2    t=3    t=4    t=5    t=6    t=7    t=8
    [A]    [B]    [C]    [D]    [E]    [F]    [G]    [H]

seq_len=3, horizon=1 的滑动窗口:

样本1:  [A] [B] [C] → [D]
        └───┬───┘   └┬┘
         输入(X)   标签(Y)

样本2:      [B] [C] [D] → [E]
            └───┬───┘   └┬┘
             输入(X)   标签(Y)

样本3:         [C] [D] [E] → [F]
               └───┬───┘   └┬┘
                输入(X)   标签(Y)

...

窗口向右滑动,每次移动 1 步

单步预测 vs 多步预测

import numpy as np
 
def create_sliding_windows(data, seq_len, horizon=1):
    """
    创建滑动窗口样本
 
    Args:
        data: numpy array, shape (n_samples, n_features)
        seq_len: 输入序列长度(用多少历史预测)
        horizon: 预测未来多少步
 
    Returns:
        X: shape (n_samples, seq_len, n_features)
        y: shape (n_samples, horizon, n_features) 或 (n_samples, n_features)
    """
    X, y = [], []
 
    for i in range(len(data) - seq_len - horizon + 1):
        # 输入:从 i 到 i+seq_len-1
        X.append(data[i:i + seq_len])
 
        # 标签:从 i+seq_len 到 i+seq_len+horizon-1
        y.append(data[i + seq_len:i + seq_len + horizon])
 
    X = np.array(X)
    y = np.array(y)
 
    # 如果 horizon=1,去掉中间维度
    if horizon == 1:
        y = y.squeeze(1)
 
    return X, y
 
# 示例:单步预测
data = np.arange(1, 11).reshape(-1, 1)  # 1 到 10
X, y = create_sliding_windows(data, seq_len=3, horizon=1)
 
print("单步预测 (horizon=1):")
print(f"数据: {data.flatten()}")
print(f"X 形状: {X.shape}, y 形状: {y.shape}")
for i in range(len(X)):
    print(f"样本 {i+1}: 输入 {X[i].flatten()} → 预测 {y[i].flatten()}")
 
# 示例:多步预测
X, y = create_sliding_windows(data, seq_len=3, horizon=2)
print("\n多步预测 (horizon=2):")
print(f"X 形状: {X.shape}, y 形状: {y.shape}")
for i in range(len(X)):
    print(f"样本 {i+1}: 输入 {X[i].flatten()} → 预测 {y[i].flatten()}")

seq_len 和 horizon 的选择

参数含义选择建议典型值
seq_len用多少历史预测太短→信息不足,太长→噪声和梯度问题20-60(日频)
horizon预测未来几步1步(默认),多步误差累积1-5
# 不同 seq_len 的效果对比
import numpy as np
 
# 生成带趋势和噪声的数据
np.random.seed(42)
t = np.linspace(0, 10, 100)
data = 2 * t + np.random.randn(100) * 0.5
 
seq_lengths = [5, 10, 20, 40]
for seq_len in seq_lengths:
    X, y = create_sliding_windows(data.reshape(-1, 1), seq_len, horizon=1)
    print(f"seq_len={seq_len}: 样本数={len(X)}, 信息比例={seq_len/len(data)*100:.1f}%")

数据标准化

为什么需要标准化

未标准化的数据问题:

特征1: [0.001, 0.002, 0.003, ...]  (收益率,很小)
特征2: [1000000, 1000050, ...]     (市值,很大)

问题:
  1. 大数值特征主导梯度更新
  2. 损失函数 landscape 不规则
  3. 优化困难,收敛慢

标准化后:
  所有特征都在相似尺度(均值0,标准差1)
  梯度更新更平衡

Z-score 标准化

import numpy as np
 
def zscore_normalize(data, mean=None, std=None):
    """
    Z-score 标准化: (x - mean) / std
 
    Args:
        data: numpy array
        mean: 预计算的均值(用于测试集)
        std: 预计算的标准差(用于测试集)
 
    Returns:
        normalized_data, mean, std
    """
    if mean is None:
        mean = np.mean(data, axis=0)
    if std is None:
        std = np.std(data, axis=0)
 
    # 避免除以0
    std = np.where(std == 0, 1, std)
 
    normalized = (data - mean) / std
    return normalized, mean, std
 
# 示例
data = np.array([[1, 100], [2, 200], [3, 300], [4, 400]], dtype=float)
normalized, mean, std = zscore_normalize(data)
 
print("原始数据:")
print(data)
print(f"\n均值: {mean}")
print(f"标准差: {std}")
print("\n标准化后:")
print(normalized)
print(f"\n验证: 均值≈{np.mean(normalized, axis=0)}, 标准差≈{np.std(normalized, axis=0)}")

Min-Max 标准化

def minmax_normalize(data, min_val=None, max_val=None, feature_range=(0, 1)):
    """
    Min-Max 标准化: (x - min) / (max - min) * (new_max - new_min) + new_min
 
    Args:
        data: numpy array
        min_val: 预计算的最小值
        max_val: 预计算的最大值
        feature_range: 目标范围,默认 (0, 1)
 
    Returns:
        normalized_data, min_val, max_val
    """
    if min_val is None:
        min_val = np.min(data, axis=0)
    if max_val is None:
        max_val = np.max(data, axis=0)
 
    # 避免除以0
    range_val = max_val - min_val
    range_val = np.where(range_val == 0, 1, range_val)
 
    # 标准化到 [0, 1]
    normalized = (data - min_val) / range_val
 
    # 缩放到目标范围
    new_min, new_max = feature_range
    normalized = normalized * (new_max - new_min) + new_min
 
    return normalized, min_val, max_val
 
# 示例
data = np.array([[1, 100], [2, 200], [3, 300], [4, 400]], dtype=float)
normalized, min_val, max_val = minmax_normalize(data)
 
print("原始数据:")
print(data)
print(f"\n最小值: {min_val}")
print(f"最大值: {max_val}")
print("\n标准化后 [0, 1]:")
print(normalized)

标准化方法对比

方法公式适用场景优点缺点
Z-score数据分布近似正态保留离群点信息对离群值敏感
Min-Max需要固定范围简单直观对离群值极敏感
Robust有离群值抗离群值分布改变大

⚠️ 标准化泄漏问题

# ❌ 错误做法:全样本标准化
def normalize_wrong(data):
    """错误:用全样本统计量标准化"""
    mean = np.mean(data, axis=0)
    std = np.std(data, axis=0)
    return (data - mean) / std
 
# 问题:模型"见过"测试集的分布
 
# ✅ 正确做法:只用训练集统计量
def normalize_correct(train_data, val_data, test_data):
    """正确:用训练集统计量标准化所有数据"""
    # 只用训练集计算统计量
    train_mean = np.mean(train_data, axis=0)
    train_std = np.std(train_data, axis=0)
 
    # 用训练集统计量标准化所有数据
    train_normalized = (train_data - train_mean) / train_std
    val_normalized = (val_data - train_mean) / train_std
    test_normalized = (test_data - train_mean) / train_std
 
    return train_normalized, val_normalized, test_normalized
 
# 示例
np.random.seed(42)
train = np.random.randn(100, 3)      # 训练集
val = np.random.randn(20, 3) + 0.1   # 验证集(偏移)
test = np.random.randn(20, 3) + 0.2  # 测试集(更偏移)
 
train_norm, val_norm, test_norm = normalize_correct(train, val, test)
 
print("训练集统计量(用于标准化):")
print(f"均值: {np.mean(train, axis=0)}")
print(f"标准差: {np.std(train, axis=0)}")
 
print("\n标准化后的均值(应该接近训练集的均值):")
print(f"训练集: {np.mean(train_norm, axis=0)}")
print(f"验证集: {np.mean(val_norm, axis=0)}")
print(f"测试集: {np.mean(test_norm, axis=0)}")

量化特有:横截面标准化

在量化中,每期内部进行标准化(横截面中性化):

def cross_sectional_normalize(data):
    """
    横截面 Z-score 标准化
 
    对每个时间步,在该时间截面上标准化所有股票
 
    Args:
        data: shape (n_dates, n_stocks, n_features)
 
    Returns:
        normalized_data
    """
    normalized = np.zeros_like(data)
 
    for t in range(data.shape[0]):  # 遍历每个时间点
        for f in range(data.shape[2]):  # 遍历每个特征
            # 获取该时间点该特征的所有股票值
           截面 = data[t, :, f]
 
            # 计算该截面的均值和标准差
            mean = np.mean(截面)
            std = np.std(截面)
 
            # 标准化
            if std > 0:
                normalized[t, :, f] = (截面 - mean) / std
            else:
                normalized[t, :, f] = 0
 
    return normalized
 
# 示例:3天 × 5只股票 × 2个特征
np.random.seed(42)
data = np.random.randn(3, 5, 2) * 10 + 100  # 模拟价格数据
 
print("原始数据(第1天):")
print(data[0])
 
normalized = cross_sectional_normalize(data)
print("\n横截面标准化后(第1天):")
print(normalized[0])
print(f"\n验证:每期每特征的均值≈0: {np.mean(normalized[0], axis=0)}")

Rolling 标准化

对于非平稳数据,使用滚动窗口标准化:

def rolling_normalize(data, window=20):
    """
    滚动窗口标准化
 
    对每个样本,用前 window 个样本的统计量标准化
 
    Args:
        data: shape (n_samples, n_features)
        window: 滚动窗口大小
 
    Returns:
        normalized_data
    """
    normalized = np.zeros_like(data)
 
    for i in range(len(data)):
        if i < window:
            # 数据不足,用累积统计量
            mean = np.mean(data[:i+1], axis=0)
            std = np.std(data[:i+1], axis=0)
        else:
            # 用前 window 个样本
            mean = np.mean(data[i-window:i], axis=0)
            std = np.std(data[i-window:i], axis=0)
 
        std = np.where(std == 0, 1, std)
        normalized[i] = (data[i] - mean) / std
 
    return normalized
 
# 示例:带趋势的数据
np.random.seed(42)
t = np.linspace(0, 10, 100)
data = 2 * t.reshape(-1, 1) + np.random.randn(100, 1) * 0.5
 
normalized = rolling_normalize(data, window=20)
 
import matplotlib.pyplot as plt
fig, axes = plt.subplots(2, 1, figsize=(12, 6))
axes[0].plot(data)
axes[0].set_title('原始数据(带趋势)')
axes[0].grid(True)
 
axes[1].plot(normalized)
axes[1].set_title('Rolling 标准化后')
axes[1].grid(True)
plt.tight_layout()
plt.show()

PyTorch Dataset 实现

完整的 TimeSeriesDataset

import torch
from torch.utils.data import Dataset
import numpy as np
 
class TimeSeriesDataset(Dataset):
    """
    时序数据集类
 
    支持多特征、单标签、多步预测
    """
    def __init__(
        self,
        data,
        seq_len,
        horizon=1,
        target_col=None,  # None 表示用所有特征预测,或指定特征索引
        transform=None
    ):
        """
        Args:
            data: numpy array, shape (n_samples, n_features) 或 pandas DataFrame
            seq_len: 输入序列长度
            horizon: 预测未来多少步
            target_col: 目标列索引,None 表示使用所有特征
            transform: 数据变换函数
        """
        self.data = torch.tensor(data, dtype=torch.float32)
        self.seq_len = seq_len
        self.horizon = horizon
        self.target_col = target_col
        self.transform = transform
 
    def __len__(self):
        return len(self.data) - self.seq_len - self.horizon + 1
 
    def __getitem__(self, idx):
        # 输入序列
        x = self.data[idx:idx + self.seq_len]
 
        # 标签
        if self.target_col is not None:
            # 只用特定特征作为标签
            y = self.data[idx + self.seq_len:idx + self.seq_len + self.horizon, self.target_col]
        else:
            # 用所有特征作为标签
            y = self.data[idx + self.seq_len:idx + self.seq_len + self.horizon]
 
        # 如果 horizon=1,去掉中间维度
        if self.horizon == 1 and len(y.shape) > 1:
            y = y.squeeze(1)
 
        # 应用变换
        if self.transform:
            x = self.transform(x)
 
        return x, y
 
# 使用示例
np.random.seed(42)
data = np.random.randn(500, 10)  # 500个样本,10个特征
 
# 创建数据集(用前20天预测第21天的第0个特征)
dataset = TimeSeriesDataset(
    data=data,
    seq_len=20,
    horizon=1,
    target_col=0  # 预测第0个特征(如收益率)
)
 
print(f"数据集大小: {len(dataset)}")
 
# 获取第一个样本
x, y = dataset[0]
print(f"输入形状: {x.shape}")  # (20, 10)
print(f"标签形状: {y.shape}")  # () 标量
 
# 获取一个多步预测样本
dataset_multi = TimeSeriesDataset(
    data=data,
    seq_len=20,
    horizon=5,  # 预测未来5步
    target_col=0
)
 
x, y = dataset_multi[0]
print(f"\n多步预测 - 输入形状: {x.shape}")  # (20, 10)
print(f"多步预测 - 标签形状: {y.shape}")  # (5,) 5个时间步

支持多特征单标签的 Dataset

class FinancialDataset(Dataset):
    """
    量化专用数据集
 
    特征:多个技术指标
    标签:未来收益率(单列)
    """
    def __init__(
        self,
        features,  # shape: (n_dates, n_stocks, n_features)
        labels,    # shape: (n_dates, n_stocks)
        seq_len=20,
        horizon=1
    ):
        """
        Args:
            features: 多维特征数组
            labels: 标签数组(未来收益率)
            seq_len: 序列长度
            horizon: 预测步数
        """
        self.features = torch.tensor(features, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.float32)
        self.seq_len = seq_len
        self.horizon = horizon
 
        # 检查维度
        assert len(features.shape) == 3, "features 应该是 (n_dates, n_stocks, n_features)"
        assert len(labels.shape) == 2, "labels 应该是 (n_dates, n_stocks)"
 
    def __len__(self):
        return self.features.shape[0] - self.seq_len - self.horizon + 1
 
    def __getitem__(self, idx):
        # 特征序列
        x = self.features[idx:idx + self.seq_len]  # (seq_len, n_stocks, n_features)
 
        # 标签
        if self.horizon == 1:
            y = self.labels[idx + self.seq_len]  # (n_stocks,)
        else:
            y = self.labels[idx + self.seq_len:idx + self.seq_len + self.horizon]  # (horizon, n_stocks)
 
        return x, y
 
# 示例:模拟量化数据
np.random.seed(42)
n_dates = 500
n_stocks = 100
n_features = 10
 
# 特征:(日期, 股票, 特征)
features = np.random.randn(n_dates, n_stocks, n_features)
 
# 标签:(日期, 股票) 未来收益率
labels = np.random.randn(n_dates, n_stocks) * 0.02
 
dataset = FinancialDataset(features, labels, seq_len=20, horizon=1)
x, y = dataset[0]
print(f"输入形状: {x.shape}")  # (20, 100, 10)
print(f"标签形状: {y.shape}")  # (100,)

DataLoader 配置

基本配置

from torch.utils.data import DataLoader
 
# 创建 DataLoader
dataloader = DataLoader(
    dataset,
    batch_size=32,      # 批次大小
    shuffle=False,      # 时序数据不打乱!
    num_workers=0,      # 数据加载进程数
    pin_memory=False,   # 是否锁页内存
    drop_last=False     # 是否丢弃最后不完整的批次
)
 
# 遍历
for batch_idx, (x_batch, y_batch) in enumerate(dataloader):
    print(f"批次 {batch_idx}: X={x_batch.shape}, y={y_batch.shape}")
    if batch_idx >= 2:
        break

DataLoader 参数详解

参数说明量化场景建议
batch_size批次大小32-256,取决于内存
shuffle是否打乱False(时序因果性)
num_workers加载进程数0(Windows)/ 4(Linux)
pin_memory锁页内存True(GPU 训练)
drop_last丢弃末批次True(训练)/ False(验证)

为什么 shuffle=False?

# ❌ 错误:shuffle=True
# 问题:破坏时序因果性
# 训练时可能用 t=100 预测 t=50
 
# ✅ 正确:shuffle=False
# 保持时间顺序,确保因果性

数据划分

时序划分

import numpy as np
 
def time_series_split(data, train_ratio=0.7, val_ratio=0.15):
    """
    时序数据划分
 
    按时间顺序划分训练集、验证集、测试集
 
    Args:
        data: numpy array, shape (n_samples, n_features)
        train_ratio: 训练集比例
        val_ratio: 验证集比例
 
    Returns:
        train, val, test
    """
    n = len(data)
    train_end = int(n * train_ratio)
    val_end = int(n * (train_ratio + val_ratio))
 
    train = data[:train_end]
    val = data[train_end:val_end]
    test = data[val_end:]
 
    print(f"数据划分:")
    print(f"  训练集: {len(train)} 样本 ({train_ratio*100:.0f}%)")
    print(f"  验证集: {len(val)} 样本 ({val_ratio*100:.0f}%)")
    print(f"  测试集: {len(test)} 样本 ({(1-train_ratio-val_ratio)*100:.0f}%)")
 
    return train, val, test
 
# 示例
np.random.seed(42)
data = np.random.randn(1000, 10)
 
train, val, test = time_series_split(data, train_ratio=0.7, val_ratio=0.15)
 
# 可视化
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 3))
plt.axvspan(0, len(train), alpha=0.3, color='green', label='Train')
plt.axvspan(len(train), len(train)+len(val), alpha=0.3, color='yellow', label='Val')
plt.axvspan(len(train)+len(val), len(data), alpha=0.3, color='red', label='Test')
plt.legend()
plt.title('时序数据划分')
plt.show()

滚动划分(Rolling Split)

用于模拟实盘滚动训练:

def rolling_time_series_split(data, n_splits=5, train_size=0.6, val_size=0.2):
    """
    滚动时序划分
 
    生成多个训练-验证-测试划分,模拟滚动重训练
 
    Args:
        data: numpy array
        n_splits: 划分数量
        train_size: 训练集大小(比例)
        val_size: 验证集大小(比例)
 
    Yields:
        (train, val, test) 元组
    """
    n = len(data)
    train_len = int(n * train_size)
    val_len = int(n * val_size)
 
    # 滚动起点
    step = (n - train_len - val_len) // n_splits
 
    for i in range(n_splits):
        start = i * step
        train_end = start + train_len
        val_end = train_end + val_len
 
        train = data[start:train_end]
        val = data[train_end:val_end]
        test = data[val_end:val_end + step]  # 测试集大小等于滚动步长
 
        yield train, val, test
 
# 示例
np.random.seed(42)
data = np.random.randn(1000, 10)
 
for fold, (train, val, test) in enumerate(rolling_time_series_split(data, n_splits=3)):
    print(f"\nFold {fold + 1}:")
    print(f"  训练: {train.shape[0]}, 验证: {val.shape[0]}, 测试: {test.shape[0]}")
    print(f"  日期范围: 训练[0:{train.shape[0]}], 验证[{train.shape[0]}:{train.shape[0]+val.shape[0]}]")

完整数据准备流水线

import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
 
class QuantDataPipeline:
    """量化数据准备流水线"""
 
    def __init__(
        self,
        data,
        seq_len=20,
        horizon=1,
        target_col=0,
        train_ratio=0.7,
        val_ratio=0.15,
        batch_size=32,
        normalize_method='zscore'
    ):
        """
        Args:
            data: numpy array, shape (n_samples, n_features)
            seq_len: 序列长度
            horizon: 预测步数
            target_col: 目标列索引
            train_ratio: 训练集比例
            val_ratio: 验证集比例
            batch_size: 批次大小
            normalize_method: 标准化方法 ('zscore', 'minmax', 'robust')
        """
        self.seq_len = seq_len
        self.horizon = horizon
        self.target_col = target_col
        self.batch_size = batch_size
        self.normalize_method = normalize_method
 
        # 划分数据
        self.train, self.val, self.test = self._split_data(data, train_ratio, val_ratio)
 
        # 标准化
        self.train_norm, self.val_norm, self.test_norm = self._normalize()
 
        # 创建 Dataset 和 DataLoader
        self.train_loader, self.val_loader, self.test_loader = self._create_loaders()
 
    def _split_data(self, data, train_ratio, val_ratio):
        """时序划分"""
        n = len(data)
        train_end = int(n * train_ratio)
        val_end = int(n * (train_ratio + val_ratio))
        return data[:train_end], data[train_end:val_end], data[val_end:]
 
    def _normalize(self):
        """标准化"""
        if self.normalize_method == 'zscore':
            # 只用训练集统计量
            mean = np.mean(self.train, axis=0)
            std = np.std(self.train, axis=0)
            std = np.where(std == 0, 1, std)
 
            train_norm = (self.train - mean) / std
            val_norm = (self.val - mean) / std
            test_norm = (self.test - mean) / std
 
        elif self.normalize_method == 'minmax':
            min_val = np.min(self.train, axis=0)
            max_val = np.max(self.train, axis=0)
            range_val = max_val - min_val
            range_val = np.where(range_val == 0, 1, range_val)
 
            train_norm = (self.train - min_val) / range_val
            val_norm = (self.val - min_val) / range_val
            test_norm = (self.test - min_val) / range_val
 
        else:
            train_norm, val_norm, test_norm = self.train, self.val, self.test
 
        return train_norm, val_norm, test_norm
 
    def _create_loaders(self):
        """创建 DataLoader"""
        train_dataset = TimeSeriesDataset(
            self.train_norm, self.seq_len, self.horizon, self.target_col
        )
        val_dataset = TimeSeriesDataset(
            self.val_norm, self.seq_len, self.horizon, self.target_col
        )
        test_dataset = TimeSeriesDataset(
            self.test_norm, self.seq_len, self.horizon, self.target_col
        )
 
        train_loader = DataLoader(
            train_dataset, batch_size=self.batch_size, shuffle=False
        )
        val_loader = DataLoader(
            val_dataset, batch_size=self.batch_size, shuffle=False
        )
        test_loader = DataLoader(
            test_dataset, batch_size=self.batch_size, shuffle=False
        )
 
        return train_loader, val_loader, test_loader
 
# 使用示例
np.random.seed(42)
data = np.random.randn(1000, 10)
 
pipeline = QuantDataPipeline(
    data=data,
    seq_len=20,
    horizon=1,
    target_col=0,
    train_ratio=0.7,
    val_ratio=0.15,
    batch_size=32,
    normalize_method='zscore'
)
 
print("数据流水线创建完成!")
print(f"\n训练批次数: {len(pipeline.train_loader)}")
print(f"验证批次数: {len(pipeline.val_loader)}")
print(f"测试批次数: {len(pipeline.test_loader)}")
 
# 获取一个批次
for x_batch, y_batch in pipeline.train_loader:
    print(f"\n批次形状: X={x_batch.shape}, y={y_batch.shape}")
    break

数据增强(简单介绍)

加噪声

def add_noise(x, noise_level=0.01):
    """
    添加高斯噪声
 
    Args:
        x: 输入数据
        noise_level: 噪声标准差比例
 
    Returns:
        添加噪声后的数据
    """
    noise = torch.randn_like(x) * noise_level
    return x + noise
 
# 示例
x = torch.randn(10, 5)
x_noisy = add_noise(x, noise_level=0.1)
print(f"原始数据标准差: {x.std():.4f}")
print(f"加噪声后标准差: {x_noisy.std():.4f}")

时间反转(注意因果性)

# ⚠️ 警告:时序预测中慎用时间反转!
# 问题:破坏因果性
 
# 适用场景:
# 1. 分类任务(不是预测)
# 2. 数据增强用于预训练
 
def time_reverse(x):
    """时间维度反转"""
    return torch.flip(x, dims=[1])  # 反转 seq_len 维度
 
# 使用建议:
# ❌ 预测任务:不要用
# ✅ 表示学习/预训练:可以使用

核心知识点总结

1. 滑动窗口

# 单步预测
X = data[i:i+seq_len]
y = data[i+seq_len]
 
# 多步预测
y = data[i+seq_len:i+seq_len+horizon]

2. 标准化

# ⚠️ 只用训练集统计量
mean = np.mean(train_data, axis=0)
std = np.std(train_data, axis=0)
train_norm = (train_data - mean) / std
val_norm = (val_data - mean) / std

3. 时序划分

# 按时间顺序,不打乱
train = data[:train_end]
val = data[train_end:val_end]
test = data[val_end:]

4. DataLoader

# shuffle=False 保持时序
dataloader = DataLoader(
    dataset,
    batch_size=32,
    shuffle=False  # 时序数据不打乱
)

练习建议

  1. 实现滚动窗口:手动实现一个带滚动窗口的 Dataset
  2. 对比标准化方法:用不同标准化方法训练模型,比较效果
  3. 可视化数据流:绘制数据从原始到训练的完整流程图

下一节

05-模型训练优化.md 中,我们将学习如何训练和优化深度学习模型。