Transformer在时间序列预测中的创新应用
传统时间序列预测方法如ARIMA、LSTM在处理长期依赖关系时存在局限,Transformer模型凭借其自注意力机制,在捕捉时间序列长期模式方面展现出显著优势。
环境配置与数据准备
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
class TimeSeriesDataset:
def __init__(self, sequence_length=96, prediction_length=24):
self.sequence_length = sequence_length
self.prediction_length = prediction_length
self.scaler = StandardScaler()
def load_data(self, file_path):
"""加载时间序列数据"""
data = pd.read_csv(file_path, parse_dates=['timestamp'], index_col='timestamp')
# 数据预处理
data = self._handle_missing_values(data)
features = self._extract_time_features(data)
return data, features
def _handle_missing_values(self, data):
"""处理缺失值"""
# 线性插值填充缺失值
data = data.interpolate(method='linear')
# 前向填充剩余缺失值
data = data.fillna(method='ffill')
return data
def _extract_time_features(self, data):
"""提取时间特征"""
features = pd.DataFrame(index=data.index)
features['hour'] = data.index.hour
features['day_of_week'] = data.index.dayofweek
features['day_of_month'] = data.index.day
features['month'] = data.index.month
features['is_weekend'] = (data.index.dayofweek >= 5).astype(int)
# 周期性编码
features['hour_sin'] = np.sin(2 * np.pi * features['hour'] / 24)
features['hour_cos'] = np.cos(2 * np.pi * features['hour'] / 24)
features['day_sin'] = np.sin(2 * np.pi * features['day_of_week'] / 7)
features['day_cos'] = np.cos(2 * np.pi * features['day_of_week'] / 7)
return features
构建时间序列Transformer模型
我们将设计一个专门针对时间序列数据特性的Transformer架构,包含位置编码、因果掩码和多头注意力机制。
位置编码与因果掩码
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_length=5000):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_length, d_model)
position = torch.arange(0, max_length, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() *
(-np.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
return x + self.pe[:x.size(0), :]
class CausalMask:
def __init__(self, seq_len):
self.mask = torch.triu(torch.ones(seq_len, seq_len) * float('-inf'), diagonal=1)
def __call__(self):
return self.mask
class TimeSeriesTransformer(nn.Module):
def __init__(self, input_dim, d_model=512, nhead=8, num_layers=6,
dim_feedforward=2048, dropout=0.1):
super(TimeSeriesTransformer, self).__init__()
self.d_model = d_model
self.input_projection = nn.Linear(input_dim, d_model)
self.pos_encoder = PositionalEncoding(d_model)
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model,
nhead=nhead,
dim_feedforward=dim_feedforward,
dropout=dropout,
batch_first=True
)
self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)
# 多步预测输出层
self.output_projection = nn.Linear(d_model, 1)
self.multi_step_head = nn.Sequential(
nn.Linear(d_model, 256),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(256, 128),
nn.ReLU(),
nn.Linear(128, 24) # 预测24个时间步
)
self._init_weights()
def _init_weights(self):
for p in self.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
def forward(self, src, src_mask=None):
# 输入投影和位置编码
src = self.input_projection(src) * np.sqrt(self.d_model)
src = self.pos_encoder(src)
# Transformer编码
memory = self.transformer_encoder(src, src_mask)
# 多步预测
multi_step_output = self.multi_step_head(memory[:, -1, :])
# 单步预测(用于训练验证)
single_step_output = self.output_projection(memory)
return single_step_output, multi_step_output
高级特征工程与数据增强
针对时间序列数据的特点,实现滑动窗口、季节性分解和异常检测等高级特征工程技术。
class AdvancedFeatureEngineer:
def __init__(self, window_sizes=[24, 168]): # 24小时,7天
self.window_sizes = window_sizes
def add_rolling_features(self, data):
"""添加滑动窗口特征"""
df = data.copy()
for window in self.window_sizes:
# 统计特征
df[f'rolling_mean_{window}'] = df['value'].rolling(window=window).mean()
df[f'rolling_std_{window}'] = df['value'].rolling(window=window).std()
df[f'rolling_min_{window}'] = df['value'].rolling(window=window).min()
df[f'rolling_max_{window}'] = df['value'].rolling(window=window).max()
# 变化率特征
df[f'rolling_change_{window}'] = df['value'].pct_change(periods=window)
return df
def seasonal_decomposition(self, data, period=24):
"""季节性分解"""
from statsmodels.tsa.seasonal import seasonal_decompose
decomposition = seasonal_decompose(data['value'], model='additive', period=period)
data['trend'] = decomposition.trend
data['seasonal'] = decomposition.seasonal
data['residual'] = decomposition.resid
return data
def detect_anomalies(self, data, method='iqr'):
"""异常检测"""
if method == 'iqr':
Q1 = data['value'].quantile(0.25)
Q3 = data['value'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
anomalies = (data['value'] upper_bound)
data['is_anomaly'] = anomalies.astype(int)
return data
def create_lagged_features(self, data, lags=[1, 2, 3, 24, 48]):
"""创建滞后特征"""
for lag in lags:
data[f'lag_{lag}'] = data['value'].shift(lag)
return data
class DataAugmentor:
"""时间序列数据增强"""
@staticmethod
def add_noise(series, noise_level=0.01):
"""添加高斯噪声"""
noise = np.random.normal(0, noise_level * np.std(series), len(series))
return series + noise
@staticmethod
def time_warp(series, warp_factor=0.1):
"""时间扭曲增强"""
from scipy.interpolate import CubicSpline
original_indices = np.arange(len(series))
warp_indices = original_indices + np.random.normal(0, warp_factor, len(series))
warp_indices = np.clip(warp_indices, 0, len(series)-1)
cs = CubicSpline(original_indices, series)
return cs(warp_indices)
@staticmethod
def scale_augmentation(series, scale_range=(0.8, 1.2)):
"""尺度变换增强"""
scale_factor = np.random.uniform(scale_range[0], scale_range[1])
return series * scale_factor
模型训练与超参数优化
实现分布式训练、早停机制和贝叶斯超参数优化,确保模型性能最优。
class ModelTrainer:
def __init__(self, model, device, criterion, optimizer):
self.model = model.to(device)
self.device = device
self.criterion = criterion
self.optimizer = optimizer
self.train_losses = []
self.val_losses = []
def train_epoch(self, train_loader):
self.model.train()
total_loss = 0
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(self.device), target.to(self.device)
self.optimizer.zero_grad()
single_output, multi_output = self.model(data)
# 多任务损失
single_loss = self.criterion(single_output.squeeze(), target)
multi_loss = self.criterion(multi_output, target[:, -24:])
loss = 0.7 * single_loss + 0.3 * multi_loss
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
self.optimizer.step()
total_loss += loss.item()
return total_loss / len(train_loader)
def validate(self, val_loader):
self.model.eval()
total_loss = 0
with torch.no_grad():
for data, target in val_loader:
data, target = data.to(self.device), target.to(self.device)
single_output, multi_output = self.model(data)
loss = self.criterion(multi_output, target[:, -24:])
total_loss += loss.item()
return total_loss / len(val_loader)
class HyperparameterOptimizer:
def __init__(self, search_space):
self.search_space = search_space
def bayesian_optimization(self, objective_function, n_trials=100):
import optuna
def objective(trial):
# 超参数采样
lr = trial.suggest_float('lr', 1e-5, 1e-2, log=True)
d_model = trial.suggest_categorical('d_model', [128, 256, 512])
nhead = trial.suggest_categorical('nhead', [4, 8, 16])
num_layers = trial.suggest_int('num_layers', 2, 8)
dropout = trial.suggest_float('dropout', 0.1, 0.5)
# 训练模型并返回验证损失
val_loss = objective_function(lr, d_model, nhead, num_layers, dropout)
return val_loss
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=n_trials)
return study.best_params
class EarlyStopping:
def __init__(self, patience=10, delta=0):
self.patience = patience
self.delta = delta
self.counter = 0
self.best_score = None
self.early_stop = False
def __call__(self, val_loss):
score = -val_loss
if self.best_score is None:
self.best_score = score
elif score = self.patience:
self.early_stop = True
else:
self.best_score = score
self.counter = 0
模型解释性与可解释AI
实现注意力可视化、特征重要性分析和预测不确定性估计,增强模型的可解释性。
class ModelInterpreter:
def __init__(self, model, device):
self.model = model
self.device = device
def visualize_attention(self, input_sequence, feature_names):
"""可视化注意力权重"""
self.model.eval()
with torch.no_grad():
# 获取注意力权重
input_proj = self.model.input_projection(input_sequence)
input_proj = self.model.pos_encoder(input_proj)
# 通过第一个编码器层获取注意力
encoder_layer = self.model.transformer_encoder.layers[0]
attn_output, attn_weights = encoder_layer.self_attn(
input_proj, input_proj, input_proj,
attn_mask=None, need_weights=True
)
# 平均多个头的注意力权重
avg_attn = attn_weights.mean(dim=1).squeeze().cpu().numpy()
# 绘制热力图
plt.figure(figsize=(12, 8))
plt.imshow(avg_attn, cmap='hot', interpolation='nearest')
plt.colorbar()
plt.xticks(range(len(feature_names)), feature_names, rotation=45)
plt.yticks(range(len(feature_names)), feature_names)
plt.title('Attention Weights Heatmap')
plt.tight_layout()
plt.show()
return avg_attn
def feature_importance_analysis(self, dataloader, n_shuffles=100):
"""基于排列的特征重要性分析"""
baseline_score = self.evaluate_model(dataloader)
feature_importance = {}
for feature_idx in range(dataloader.dataset[0][0].shape[-1]):
shuffled_scores = []
for _ in range(n_shuffles):
# 创建数据副本并打乱特定特征
shuffled_loader = self.shuffle_feature(dataloader, feature_idx)
score = self.evaluate_model(shuffled_loader)
shuffled_scores.append(score)
importance = baseline_score - np.mean(shuffled_scores)
feature_importance[feature_idx] = importance
return feature_importance
def prediction_uncertainty(self, input_sequence, n_samples=100):
"""使用MC Dropout估计预测不确定性"""
self.model.train() # 保持dropout开启
predictions = []
with torch.no_grad():
for _ in range(n_samples):
_, multi_output = self.model(input_sequence.unsqueeze(0))
predictions.append(multi_output.cpu().numpy())
predictions = np.array(predictions).squeeze()
mean_prediction = predictions.mean(axis=0)
std_prediction = predictions.std(axis=0)
return mean_prediction, std_prediction
class ModelEvaluator:
def __init__(self):
self.metrics = {}
def calculate_metrics(self, y_true, y_pred):
"""计算多种评估指标"""
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
mae = mean_absolute_error(y_true, y_pred)
mse = mean_squared_error(y_true, y_pred)
rmse = np.sqrt(mse)
r2 = r2_score(y_true, y_pred)
# MAPE (Mean Absolute Percentage Error)
mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100
metrics = {
'MAE': mae,
'MSE': mse,
'RMSE': rmse,
'R2': r2,
'MAPE': mape
}
return metrics
def plot_predictions(self, y_true, y_pred, timestamps):
"""绘制预测结果对比图"""
plt.figure(figsize=(15, 6))
plt.plot(timestamps, y_true, label='True Values', alpha=0.7)
plt.plot(timestamps, y_pred, label='Predictions', alpha=0.7)
plt.fill_between(timestamps,
y_pred - np.std(y_pred - y_true),
y_pred + np.std(y_pred - y_true),
alpha=0.2, label='Uncertainty')
plt.xlabel('Time')
plt.ylabel('Value')
plt.title('Time Series Prediction Results')
plt.legend()
plt.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
生产环境部署与监控
实现模型服务化、性能监控和自动重训练 pipeline,确保系统稳定运行。
class ModelServer:
def __init__(self, model_path, device):
self.device = device
self.model = self.load_model(model_path)
self.scaler = self.load_scaler()
def load_model(self, model_path):
"""加载训练好的模型"""
model = TimeSeriesTransformer(input_dim=50) # 根据实际特征维度调整
model.load_state_dict(torch.load(model_path, map_location=self.device))
model.eval()
return model
def preprocess_request(self, request_data):
"""预处理请求数据"""
df = pd.DataFrame(request_data)
features = self.feature_engineer.transform(df)
scaled_features = self.scaler.transform(features)
tensor_data = torch.FloatTensor(scaled_features).unsqueeze(0)
return tensor_data.to(self.device)
def predict(self, input_data):
"""模型预测"""
with torch.no_grad():
single_output, multi_output = self.model(input_data)
predictions = {
'single_step': single_output.cpu().numpy().tolist(),
'multi_step': multi_output.cpu().numpy().tolist(),
'timestamp': pd.Timestamp.now().isoformat()
}
return predictions
class PerformanceMonitor:
def __init__(self, window_size=1000):
self.window_size = window_size
self.latencies = []
self.predictions = []
self.actuals = []
def update_metrics(self, latency, prediction, actual=None):
"""更新性能指标"""
self.latencies.append(latency)
if actual is not None:
self.predictions.append(prediction)
self.actuals.append(actual)
# 保持窗口大小
if len(self.latencies) > self.window_size:
self.latencies.pop(0)
self.predictions.pop(0)
self.actuals.pop(0)
def get_performance_report(self):
"""生成性能报告"""
report = {
'avg_latency': np.mean(self.latencies),
'p95_latency': np.percentile(self.latencies, 95),
'throughput': len(self.latencies) / 60, # 每分钟请求数
'current_accuracy': self.calculate_accuracy() if self.actuals else None
}
return report
class AutoRetrainSystem:
def __init__(self, model_path, data_path, retrain_interval=24):
self.model_path = model_path
self.data_path = data_path
self.retrain_interval = retrain_interval # 小时
def check_retrain_condition(self):
"""检查是否需要重新训练"""
# 基于模型性能下降或数据漂移
performance_degraded = self.monitor_performance_degradation()
data_drift_detected = self.detect_data_drift()
return performance_degraded or data_drift_detected
def trigger_retraining(self):
"""触发重新训练流程"""
# 1. 准备新数据
new_data = self.collect_new_data()
# 2. 增量训练或全量训练
if self.should_incremental_train():
self.incremental_training(new_data)
else:
self.full_training(new_data)
# 3. 模型验证和部署
if self.validate_new_model():
self.deploy_new_model()
总结与最佳实践
通过本教程,我们构建了一个完整的基于Transformer的时间序列预测系统,具备以下核心特性:
- 先进架构:基于Transformer的编码器架构,有效捕捉长期依赖关系
- 丰富特征:时间特征、滑动窗口统计、季节性分解等多维度特征工程
- 智能训练:多任务学习、早停机制、贝叶斯超参数优化
- 可解释性:注意力可视化、特征重要性分析、不确定性估计
- 生产就绪:模型服务化、性能监控、自动重训练
关键技术要点:
- 使用因果掩码确保预测不会看到未来信息
- 结合单步和多步预测任务提升模型泛化能力
- 实现全面的数据增强策略应对数据稀缺问题
- 采用MC Dropout进行不确定性量化
- 建立自动化监控和重训练pipeline
该解决方案可广泛应用于金融预测、能源需求预测、销量预测等多个领域,为时间序列分析提供了强大的深度学习工具。