Python机器学习实战:基于Transformer的时间序列预测系统 | AI技术应用

2025-10-09 0 128

Transformer在时间序列预测中的创新应用

传统时间序列预测方法如ARIMA、LSTM在处理长期依赖关系时存在局限,Transformer模型凭借其自注意力机制,在捕捉时间序列长期模式方面展现出显著优势。

环境配置与数据准备

import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt

class TimeSeriesDataset:
    def __init__(self, sequence_length=96, prediction_length=24):
        self.sequence_length = sequence_length
        self.prediction_length = prediction_length
        self.scaler = StandardScaler()
        
    def load_data(self, file_path):
        """加载时间序列数据"""
        data = pd.read_csv(file_path, parse_dates=['timestamp'], index_col='timestamp')
        
        # 数据预处理
        data = self._handle_missing_values(data)
        features = self._extract_time_features(data)
        
        return data, features
    
    def _handle_missing_values(self, data):
        """处理缺失值"""
        # 线性插值填充缺失值
        data = data.interpolate(method='linear')
        # 前向填充剩余缺失值
        data = data.fillna(method='ffill')
        return data
    
    def _extract_time_features(self, data):
        """提取时间特征"""
        features = pd.DataFrame(index=data.index)
        features['hour'] = data.index.hour
        features['day_of_week'] = data.index.dayofweek
        features['day_of_month'] = data.index.day
        features['month'] = data.index.month
        features['is_weekend'] = (data.index.dayofweek >= 5).astype(int)
        
        # 周期性编码
        features['hour_sin'] = np.sin(2 * np.pi * features['hour'] / 24)
        features['hour_cos'] = np.cos(2 * np.pi * features['hour'] / 24)
        features['day_sin'] = np.sin(2 * np.pi * features['day_of_week'] / 7)
        features['day_cos'] = np.cos(2 * np.pi * features['day_of_week'] / 7)
        
        return features

构建时间序列Transformer模型

我们将设计一个专门针对时间序列数据特性的Transformer架构,包含位置编码、因果掩码和多头注意力机制。

位置编码与因果掩码

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_length=5000):
        super(PositionalEncoding, self).__init__()
        
        pe = torch.zeros(max_length, d_model)
        position = torch.arange(0, max_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * 
                           (-np.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        return x + self.pe[:x.size(0), :]

class CausalMask:
    def __init__(self, seq_len):
        self.mask = torch.triu(torch.ones(seq_len, seq_len) * float('-inf'), diagonal=1)
    
    def __call__(self):
        return self.mask

class TimeSeriesTransformer(nn.Module):
    def __init__(self, input_dim, d_model=512, nhead=8, num_layers=6, 
                 dim_feedforward=2048, dropout=0.1):
        super(TimeSeriesTransformer, self).__init__()
        
        self.d_model = d_model
        self.input_projection = nn.Linear(input_dim, d_model)
        self.pos_encoder = PositionalEncoding(d_model)
        
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)
        
        # 多步预测输出层
        self.output_projection = nn.Linear(d_model, 1)
        self.multi_step_head = nn.Sequential(
            nn.Linear(d_model, 256),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, 24)  # 预测24个时间步
        )
        
        self._init_weights()
    
    def _init_weights(self):
        for p in self.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p)
    
    def forward(self, src, src_mask=None):
        # 输入投影和位置编码
        src = self.input_projection(src) * np.sqrt(self.d_model)
        src = self.pos_encoder(src)
        
        # Transformer编码
        memory = self.transformer_encoder(src, src_mask)
        
        # 多步预测
        multi_step_output = self.multi_step_head(memory[:, -1, :])
        
        # 单步预测(用于训练验证)
        single_step_output = self.output_projection(memory)
        
        return single_step_output, multi_step_output

高级特征工程与数据增强

针对时间序列数据的特点,实现滑动窗口、季节性分解和异常检测等高级特征工程技术。

class AdvancedFeatureEngineer:
    def __init__(self, window_sizes=[24, 168]):  # 24小时,7天
        self.window_sizes = window_sizes
        
    def add_rolling_features(self, data):
        """添加滑动窗口特征"""
        df = data.copy()
        
        for window in self.window_sizes:
            # 统计特征
            df[f'rolling_mean_{window}'] = df['value'].rolling(window=window).mean()
            df[f'rolling_std_{window}'] = df['value'].rolling(window=window).std()
            df[f'rolling_min_{window}'] = df['value'].rolling(window=window).min()
            df[f'rolling_max_{window}'] = df['value'].rolling(window=window).max()
            
            # 变化率特征
            df[f'rolling_change_{window}'] = df['value'].pct_change(periods=window)
            
        return df
    
    def seasonal_decomposition(self, data, period=24):
        """季节性分解"""
        from statsmodels.tsa.seasonal import seasonal_decompose
        
        decomposition = seasonal_decompose(data['value'], model='additive', period=period)
        
        data['trend'] = decomposition.trend
        data['seasonal'] = decomposition.seasonal
        data['residual'] = decomposition.resid
        
        return data
    
    def detect_anomalies(self, data, method='iqr'):
        """异常检测"""
        if method == 'iqr':
            Q1 = data['value'].quantile(0.25)
            Q3 = data['value'].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            
            anomalies = (data['value']  upper_bound)
            data['is_anomaly'] = anomalies.astype(int)
            
        return data
    
    def create_lagged_features(self, data, lags=[1, 2, 3, 24, 48]):
        """创建滞后特征"""
        for lag in lags:
            data[f'lag_{lag}'] = data['value'].shift(lag)
            
        return data

class DataAugmentor:
    """时间序列数据增强"""
    
    @staticmethod
    def add_noise(series, noise_level=0.01):
        """添加高斯噪声"""
        noise = np.random.normal(0, noise_level * np.std(series), len(series))
        return series + noise
    
    @staticmethod
    def time_warp(series, warp_factor=0.1):
        """时间扭曲增强"""
        from scipy.interpolate import CubicSpline
        
        original_indices = np.arange(len(series))
        warp_indices = original_indices + np.random.normal(0, warp_factor, len(series))
        warp_indices = np.clip(warp_indices, 0, len(series)-1)
        
        cs = CubicSpline(original_indices, series)
        return cs(warp_indices)
    
    @staticmethod
    def scale_augmentation(series, scale_range=(0.8, 1.2)):
        """尺度变换增强"""
        scale_factor = np.random.uniform(scale_range[0], scale_range[1])
        return series * scale_factor

模型训练与超参数优化

实现分布式训练、早停机制和贝叶斯超参数优化,确保模型性能最优。

class ModelTrainer:
    def __init__(self, model, device, criterion, optimizer):
        self.model = model.to(device)
        self.device = device
        self.criterion = criterion
        self.optimizer = optimizer
        self.train_losses = []
        self.val_losses = []
        
    def train_epoch(self, train_loader):
        self.model.train()
        total_loss = 0
        
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(self.device), target.to(self.device)
            
            self.optimizer.zero_grad()
            single_output, multi_output = self.model(data)
            
            # 多任务损失
            single_loss = self.criterion(single_output.squeeze(), target)
            multi_loss = self.criterion(multi_output, target[:, -24:])
            loss = 0.7 * single_loss + 0.3 * multi_loss
            
            loss.backward()
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
            self.optimizer.step()
            
            total_loss += loss.item()
            
        return total_loss / len(train_loader)
    
    def validate(self, val_loader):
        self.model.eval()
        total_loss = 0
        
        with torch.no_grad():
            for data, target in val_loader:
                data, target = data.to(self.device), target.to(self.device)
                single_output, multi_output = self.model(data)
                
                loss = self.criterion(multi_output, target[:, -24:])
                total_loss += loss.item()
                
        return total_loss / len(val_loader)

class HyperparameterOptimizer:
    def __init__(self, search_space):
        self.search_space = search_space
        
    def bayesian_optimization(self, objective_function, n_trials=100):
        import optuna
        
        def objective(trial):
            # 超参数采样
            lr = trial.suggest_float('lr', 1e-5, 1e-2, log=True)
            d_model = trial.suggest_categorical('d_model', [128, 256, 512])
            nhead = trial.suggest_categorical('nhead', [4, 8, 16])
            num_layers = trial.suggest_int('num_layers', 2, 8)
            dropout = trial.suggest_float('dropout', 0.1, 0.5)
            
            # 训练模型并返回验证损失
            val_loss = objective_function(lr, d_model, nhead, num_layers, dropout)
            return val_loss
        
        study = optuna.create_study(direction='minimize')
        study.optimize(objective, n_trials=n_trials)
        
        return study.best_params

class EarlyStopping:
    def __init__(self, patience=10, delta=0):
        self.patience = patience
        self.delta = delta
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        
    def __call__(self, val_loss):
        score = -val_loss
        
        if self.best_score is None:
            self.best_score = score
        elif score = self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.counter = 0

模型解释性与可解释AI

实现注意力可视化、特征重要性分析和预测不确定性估计,增强模型的可解释性。

class ModelInterpreter:
    def __init__(self, model, device):
        self.model = model
        self.device = device
        
    def visualize_attention(self, input_sequence, feature_names):
        """可视化注意力权重"""
        self.model.eval()
        
        with torch.no_grad():
            # 获取注意力权重
            input_proj = self.model.input_projection(input_sequence)
            input_proj = self.model.pos_encoder(input_proj)
            
            # 通过第一个编码器层获取注意力
            encoder_layer = self.model.transformer_encoder.layers[0]
            attn_output, attn_weights = encoder_layer.self_attn(
                input_proj, input_proj, input_proj,
                attn_mask=None, need_weights=True
            )
            
            # 平均多个头的注意力权重
            avg_attn = attn_weights.mean(dim=1).squeeze().cpu().numpy()
            
            # 绘制热力图
            plt.figure(figsize=(12, 8))
            plt.imshow(avg_attn, cmap='hot', interpolation='nearest')
            plt.colorbar()
            plt.xticks(range(len(feature_names)), feature_names, rotation=45)
            plt.yticks(range(len(feature_names)), feature_names)
            plt.title('Attention Weights Heatmap')
            plt.tight_layout()
            plt.show()
            
        return avg_attn
    
    def feature_importance_analysis(self, dataloader, n_shuffles=100):
        """基于排列的特征重要性分析"""
        baseline_score = self.evaluate_model(dataloader)
        feature_importance = {}
        
        for feature_idx in range(dataloader.dataset[0][0].shape[-1]):
            shuffled_scores = []
            
            for _ in range(n_shuffles):
                # 创建数据副本并打乱特定特征
                shuffled_loader = self.shuffle_feature(dataloader, feature_idx)
                score = self.evaluate_model(shuffled_loader)
                shuffled_scores.append(score)
            
            importance = baseline_score - np.mean(shuffled_scores)
            feature_importance[feature_idx] = importance
            
        return feature_importance
    
    def prediction_uncertainty(self, input_sequence, n_samples=100):
        """使用MC Dropout估计预测不确定性"""
        self.model.train()  # 保持dropout开启
        
        predictions = []
        
        with torch.no_grad():
            for _ in range(n_samples):
                _, multi_output = self.model(input_sequence.unsqueeze(0))
                predictions.append(multi_output.cpu().numpy())
        
        predictions = np.array(predictions).squeeze()
        mean_prediction = predictions.mean(axis=0)
        std_prediction = predictions.std(axis=0)
        
        return mean_prediction, std_prediction

class ModelEvaluator:
    def __init__(self):
        self.metrics = {}
    
    def calculate_metrics(self, y_true, y_pred):
        """计算多种评估指标"""
        from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
        
        mae = mean_absolute_error(y_true, y_pred)
        mse = mean_squared_error(y_true, y_pred)
        rmse = np.sqrt(mse)
        r2 = r2_score(y_true, y_pred)
        
        # MAPE (Mean Absolute Percentage Error)
        mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100
        
        metrics = {
            'MAE': mae,
            'MSE': mse,
            'RMSE': rmse,
            'R2': r2,
            'MAPE': mape
        }
        
        return metrics
    
    def plot_predictions(self, y_true, y_pred, timestamps):
        """绘制预测结果对比图"""
        plt.figure(figsize=(15, 6))
        
        plt.plot(timestamps, y_true, label='True Values', alpha=0.7)
        plt.plot(timestamps, y_pred, label='Predictions', alpha=0.7)
        plt.fill_between(timestamps, 
                        y_pred - np.std(y_pred - y_true),
                        y_pred + np.std(y_pred - y_true),
                        alpha=0.2, label='Uncertainty')
        
        plt.xlabel('Time')
        plt.ylabel('Value')
        plt.title('Time Series Prediction Results')
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()

生产环境部署与监控

实现模型服务化、性能监控和自动重训练 pipeline,确保系统稳定运行。

class ModelServer:
    def __init__(self, model_path, device):
        self.device = device
        self.model = self.load_model(model_path)
        self.scaler = self.load_scaler()
        
    def load_model(self, model_path):
        """加载训练好的模型"""
        model = TimeSeriesTransformer(input_dim=50)  # 根据实际特征维度调整
        model.load_state_dict(torch.load(model_path, map_location=self.device))
        model.eval()
        return model
    
    def preprocess_request(self, request_data):
        """预处理请求数据"""
        df = pd.DataFrame(request_data)
        features = self.feature_engineer.transform(df)
        scaled_features = self.scaler.transform(features)
        tensor_data = torch.FloatTensor(scaled_features).unsqueeze(0)
        
        return tensor_data.to(self.device)
    
    def predict(self, input_data):
        """模型预测"""
        with torch.no_grad():
            single_output, multi_output = self.model(input_data)
            
        predictions = {
            'single_step': single_output.cpu().numpy().tolist(),
            'multi_step': multi_output.cpu().numpy().tolist(),
            'timestamp': pd.Timestamp.now().isoformat()
        }
        
        return predictions

class PerformanceMonitor:
    def __init__(self, window_size=1000):
        self.window_size = window_size
        self.latencies = []
        self.predictions = []
        self.actuals = []
        
    def update_metrics(self, latency, prediction, actual=None):
        """更新性能指标"""
        self.latencies.append(latency)
        if actual is not None:
            self.predictions.append(prediction)
            self.actuals.append(actual)
            
        # 保持窗口大小
        if len(self.latencies) > self.window_size:
            self.latencies.pop(0)
            self.predictions.pop(0)
            self.actuals.pop(0)
    
    def get_performance_report(self):
        """生成性能报告"""
        report = {
            'avg_latency': np.mean(self.latencies),
            'p95_latency': np.percentile(self.latencies, 95),
            'throughput': len(self.latencies) / 60,  # 每分钟请求数
            'current_accuracy': self.calculate_accuracy() if self.actuals else None
        }
        
        return report

class AutoRetrainSystem:
    def __init__(self, model_path, data_path, retrain_interval=24):
        self.model_path = model_path
        self.data_path = data_path
        self.retrain_interval = retrain_interval  # 小时
        
    def check_retrain_condition(self):
        """检查是否需要重新训练"""
        # 基于模型性能下降或数据漂移
        performance_degraded = self.monitor_performance_degradation()
        data_drift_detected = self.detect_data_drift()
        
        return performance_degraded or data_drift_detected
    
    def trigger_retraining(self):
        """触发重新训练流程"""
        # 1. 准备新数据
        new_data = self.collect_new_data()
        
        # 2. 增量训练或全量训练
        if self.should_incremental_train():
            self.incremental_training(new_data)
        else:
            self.full_training(new_data)
        
        # 3. 模型验证和部署
        if self.validate_new_model():
            self.deploy_new_model()

总结与最佳实践

通过本教程,我们构建了一个完整的基于Transformer的时间序列预测系统,具备以下核心特性:

  • 先进架构:基于Transformer的编码器架构,有效捕捉长期依赖关系
  • 丰富特征:时间特征、滑动窗口统计、季节性分解等多维度特征工程
  • 智能训练:多任务学习、早停机制、贝叶斯超参数优化
  • 可解释性:注意力可视化、特征重要性分析、不确定性估计
  • 生产就绪:模型服务化、性能监控、自动重训练

关键技术要点

  1. 使用因果掩码确保预测不会看到未来信息
  2. 结合单步和多步预测任务提升模型泛化能力
  3. 实现全面的数据增强策略应对数据稀缺问题
  4. 采用MC Dropout进行不确定性量化
  5. 建立自动化监控和重训练pipeline

该解决方案可广泛应用于金融预测、能源需求预测、销量预测等多个领域,为时间序列分析提供了强大的深度学习工具。

Python机器学习实战:基于Transformer的时间序列预测系统 | AI技术应用
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 thinkphp Python机器学习实战:基于Transformer的时间序列预测系统 | AI技术应用 https://www.taomawang.com/server/thinkphp/1184.html

下一篇:

已经没有下一篇了!

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务