PHP异步任务队列实战:基于Redis的延迟任务系统设计与实现 | PHP高级应用教程

2026-02-23 0 187
免费资源下载

原创技术教程 | 更新时间:2023年10月

一、异步任务队列的核心价值

在现代Web应用中,同步处理耗时任务会导致请求阻塞、用户体验下降。异步任务队列通过解耦任务触发与执行,实现:

  • 提升响应速度:立即返回响应,后台异步处理
  • 提高系统吞吐量:避免进程阻塞,充分利用资源
  • 实现延迟执行:支持定时任务和延迟任务
  • 增强系统可靠性:任务持久化,失败重试机制

本文将构建一个基于Redis的分布式延迟任务系统,支持精确到秒的延迟执行和任务优先级管理。

二、系统架构设计

2.1 核心组件

┌─────────────────┐    ┌──────────────┐    ┌──────────────┐
│  任务生产者     │───▶│ Redis队列    │───▶│  任务消费者  │
│  (Producer)     │    │  (Sorted Set)│    │  (Consumer)  │
└─────────────────┘    └──────────────┘    └──────────────┘
        │                       │                       │
        │                       │                       │
        ▼                       ▼                       ▼
┌─────────────────┐    ┌──────────────┐    ┌──────────────┐
│  Web应用/API    │    │  延迟队列    │    │  工作进程    │
│                 │    │  (ZSET)      │    │  (Worker)    │
└─────────────────┘    └──────────────┘    └──────────────┘

2.2 数据结构设计

使用Redis有序集合(ZSET)存储延迟任务:

  • Score:任务执行时间戳(Unix timestamp)
  • Member:JSON格式的任务数据
  • 额外Hash存储:任务元信息和重试次数

三、核心代码实现

3.1 任务队列管理器

<?php
/**
 * Redis延迟任务队列管理器
 */
class RedisDelayQueue
{
    private $redis;
    private $queueKey = 'delay_queue:tasks';
    private $metaKeyPrefix = 'delay_queue:meta:';
    
    public function __construct($config = [])
    {
        $this->redis = new Redis();
        $this->redis->connect(
            $config['host'] ?? '127.0.0.1',
            $config['port'] ?? 6379
        );
        
        if (!empty($config['password'])) {
            $this->redis->auth($config['password']);
        }
        
        if (!empty($config['database'])) {
            $this->redis->select($config['database']);
        }
    }
    
    /**
     * 添加延迟任务
     * @param string $taskId 任务ID
     * @param array $data 任务数据
     * @param int $delaySeconds 延迟秒数
     * @param int $priority 优先级(1-10)
     * @return bool
     */
    public function addTask($taskId, $data, $delaySeconds, $priority = 5)
    {
        $executeTime = time() + $delaySeconds;
        $adjustedTime = $executeTime - ($priority - 1);
        
        $task = [
            'id' => $taskId,
            'data' => $data,
            'created_at' => time(),
            'priority' => $priority,
            'retry_count' => 0
        ];
        
        // 存储到有序集合
        $result = $this->redis->zAdd(
            $this->queueKey,
            $adjustedTime,
            json_encode($task)
        );
        
        if ($result) {
            // 存储元数据
            $metaKey = $this->metaKeyPrefix . $taskId;
            $this->redis->hMSet($metaKey, [
                'status' => 'pending',
                'max_retries' => 3,
                'current_retries' => 0
            ]);
            $this->redis->expire($metaKey, 86400 * 7);
        }
        
        return $result;
    }
    
    /**
     * 获取待执行任务
     * @return array|null
     */
    public function getDueTasks($limit = 10)
    {
        $now = time();
        $tasks = [];
        
        // 获取score小于等于当前时间的任务
        $items = $this->redis->zRangeByScore(
            $this->queueKey,
            0,
            $now,
            ['limit' => [0, $limit]]
        );
        
        foreach ($items as $item) {
            $task = json_decode($item, true);
            if ($task) {
                $tasks[] = $task;
                // 从有序集合中移除
                $this->redis->zRem($this->queueKey, $item);
            }
        }
        
        return $tasks;
    }
    
    /**
     * 更新任务状态
     */
    public function updateTaskStatus($taskId, $status, $error = null)
    {
        $metaKey = $this->metaKeyPrefix . $taskId;
        
        $updateData = ['status' => $status];
        if ($error) {
            $updateData['last_error'] = $error;
            $updateData['failed_at'] = time();
        }
        
        if ($status === 'processing') {
            $updateData['started_at'] = time();
        } elseif ($status === 'completed') {
            $updateData['completed_at'] = time();
        }
        
        return $this->redis->hMSet($metaKey, $updateData);
    }
}
?>

3.2 任务处理器基类

<?php
/**
 * 抽象任务处理器
 */
abstract class TaskHandler
{
    protected $maxRetries = 3;
    protected $retryDelay = 60;
    
    abstract public function handle($taskData);
    
    public function failed($taskData, $exception)
    {
        // 默认失败处理:记录日志
        error_log(sprintf(
            "Task failed: %s, Error: %s",
            json_encode($taskData),
            $exception->getMessage()
        ));
    }
    
    public function shouldRetry($retryCount, $exception)
    {
        return $retryCount < $this->maxRetries;
    }
    
    public function getRetryDelay($retryCount)
    {
        return $this->retryDelay * pow(2, $retryCount);
    }
}

/**
 * 邮件发送任务处理器
 */
class EmailTaskHandler extends TaskHandler
{
    public function handle($taskData)
    {
        $email = $taskData['email'];
        $subject = $taskData['subject'];
        $content = $taskData['content'];
        
        // 模拟邮件发送
        if (!filter_var($email, FILTER_VALIDATE_EMAIL)) {
            throw new InvalidArgumentException("Invalid email address");
        }
        
        // 实际发送邮件逻辑
        // mail($email, $subject, $content);
        
        return true;
    }
    
    public function shouldRetry($retryCount, $exception)
    {
        // 对于无效邮箱,不重试
        if ($exception instanceof InvalidArgumentException) {
            return false;
        }
        
        return parent::shouldRetry($retryCount, $exception);
    }
}
?>

四、高级特性与优化

4.1 分布式工作者实现

<?php
/**
 * 分布式任务工作者
 */
class DistributedWorker
{
    private $queue;
    private $handlers = [];
    private $isRunning = false;
    private $workerId;
    
    public function __construct(RedisDelayQueue $queue)
    {
        $this->queue = $queue;
        $this->workerId = uniqid('worker_', true);
    }
    
    public function registerHandler($taskType, TaskHandler $handler)
    {
        $this->handlers[$taskType] = $handler;
    }
    
    public function start($options = [])
    {
        $this->isRunning = true;
        $sleepTime = $options['sleep'] ?? 1;
        
        pcntl_signal(SIGTERM, [$this, 'shutdown']);
        pcntl_signal(SIGINT, [$this, 'shutdown']);
        
        echo "Worker {$this->workerId} startedn";
        
        while ($this->isRunning) {
            pcntl_signal_dispatch();
            
            try {
                $tasks = $this->queue->getDueTasks(5);
                
                if (empty($tasks)) {
                    sleep($sleepTime);
                    continue;
                }
                
                foreach ($tasks as $task) {
                    $this->processTask($task);
                }
                
            } catch (Exception $e) {
                error_log("Worker error: " . $e->getMessage());
                sleep($sleepTime * 2);
            }
        }
    }
    
    private function processTask($task)
    {
        $taskId = $task['id'];
        $taskType = $task['data']['type'] ?? 'default';
        
        $this->queue->updateTaskStatus($taskId, 'processing');
        
        try {
            if (!isset($this->handlers[$taskType])) {
                throw new RuntimeException("No handler for task type: {$taskType}");
            }
            
            $handler = $this->handlers[$taskType];
            $result = $handler->handle($task['data']);
            
            $this->queue->updateTaskStatus($taskId, 'completed');
            
            echo "Task {$taskId} completed successfullyn";
            
        } catch (Exception $e) {
            $this->handleTaskFailure($task, $e, $handler ?? null);
        }
    }
    
    private function handleTaskFailure($task, $exception, $handler)
    {
        $taskId = $task['id'];
        $retryCount = $task['retry_count'] ?? 0;
        
        if ($handler && $handler->shouldRetry($retryCount, $exception)) {
            $delay = $handler->getRetryDelay($retryCount);
            
            $task['retry_count'] = $retryCount + 1;
            $this->queue->addTask(
                $taskId . '_retry_' . ($retryCount + 1),
                $task['data'],
                $delay,
                $task['priority']
            );
            
            echo "Task {$taskId} scheduled for retry in {$delay} secondsn";
        } else {
            $this->queue->updateTaskStatus($taskId, 'failed', $exception->getMessage());
            
            if ($handler) {
                $handler->failed($task['data'], $exception);
            }
            
            echo "Task {$taskId} failed permanently: " . $exception->getMessage() . "n";
        }
    }
    
    public function shutdown($signal)
    {
        echo "Worker {$this->workerId} shutting down...n";
        $this->isRunning = false;
    }
}
?>

五、实战案例:订单超时关闭

5.1 订单服务集成

<?php
/**
 * 订单服务 - 集成延迟队列
 */
class OrderService
{
    private $delayQueue;
    private $orderTimeout = 1800; // 30分钟
    
    public function __construct(RedisDelayQueue $queue)
    {
        $this->delayQueue = $queue;
    }
    
    /**
     * 创建订单并设置超时关闭任务
     */
    public function createOrder($orderData)
    {
        $orderId = 'ORDER_' . time() . '_' . uniqid();
        
        // 保存订单到数据库
        $this->saveOrderToDatabase($orderId, $orderData);
        
        // 添加延迟任务:30分钟后关闭未支付订单
        $taskId = 'order_timeout_' . $orderId;
        $taskData = [
            'type' => 'order_timeout',
            'order_id' => $orderId,
            'action' => 'close_unpaid_order'
        ];
        
        $this->delayQueue->addTask(
            $taskId,
            $taskData,
            $this->orderTimeout,
            8 // 较高优先级
        );
        
        return $orderId;
    }
    
    /**
     * 订单支付成功,取消超时任务
     */
    public function markOrderPaid($orderId)
    {
        // 更新订单状态
        $this->updateOrderStatus($orderId, 'paid');
        
        // 在实际应用中,这里需要实现任务取消逻辑
        // 可以通过存储任务ID到订单关联表,然后删除对应任务
    }
}

/**
 * 订单超时处理器
 */
class OrderTimeoutHandler extends TaskHandler
{
    private $orderService;
    
    public function __construct($orderService)
    {
        $this->orderService = $orderService;
        $this->maxRetries = 2;
    }
    
    public function handle($taskData)
    {
        $orderId = $taskData['order_id'];
        
        // 检查订单状态
        $order = $this->orderService->getOrder($orderId);
        
        if (!$order) {
            throw new RuntimeException("Order not found: {$orderId}");
        }
        
        if ($order['status'] !== 'pending') {
            // 订单已处理,无需操作
            return true;
        }
        
        // 执行订单关闭逻辑
        $this->orderService->closeOrder($orderId, 'timeout');
        
        // 发送通知
        $this->sendTimeoutNotification($order);
        
        return true;
    }
    
    private function sendTimeoutNotification($order)
    {
        // 发送邮件或短信通知
        $message = "您的订单 {$order['order_no']} 因超时未支付已自动关闭";
        // 实际发送逻辑...
    }
}
?>

5.2 启动脚本

<?php
// worker.php
require_once 'RedisDelayQueue.php';
require_once 'TaskHandler.php';
require_once 'DistributedWorker.php';
require_once 'OrderService.php';

// 配置Redis连接
$redisConfig = [
    'host' => '127.0.0.1',
    'port' => 6379,
    'database' => 1
];

// 初始化队列
$queue = new RedisDelayQueue($redisConfig);

// 初始化工作者
$worker = new DistributedWorker($queue);

// 注册处理器
$orderService = new OrderService($queue);
$worker->registerHandler('order_timeout', new OrderTimeoutHandler($orderService));
$worker->registerHandler('email', new EmailTaskHandler());

// 启动工作者
$worker->start([
    'sleep' => 1, // 无任务时休眠秒数
]);

// 命令行启动:php worker.php --name=worker1
?>

六、监控与运维

6.1 监控指标收集

<?php
/**
 * 队列监控服务
 */
class QueueMonitor
{
    private $queue;
    private $metrics = [];
    
    public function __construct(RedisDelayQueue $queue)
    {
        $this->queue = $queue;
    }
    
    public function collectMetrics()
    {
        $metrics = [
            'pending_tasks' => $this->getPendingCount(),
            'delayed_tasks' => $this->getDelayedCount(),
            'processing_tasks' => $this->getProcessingCount(),
            'failed_tasks' => $this->getFailedCount(),
            'avg_process_time' => $this->getAverageProcessTime()
        ];
        
        $this->metrics = array_merge($this->metrics, $metrics);
        return $metrics;
    }
    
    private function getPendingCount()
    {
        // 获取待处理任务数量
        $now = time();
        return $this->queue->getRedis()->zCount(
            'delay_queue:tasks',
            0,
            $now
        );
    }
    
    public function generateReport()
    {
        $metrics = $this->collectMetrics();
        
        $report = [
            'timestamp' => date('Y-m-d H:i:s'),
            'metrics' => $metrics,
            'alerts' => $this->checkAlerts($metrics)
        ];
        
        return $report;
    }
    
    private function checkAlerts($metrics)
    {
        $alerts = [];
        
        if ($metrics['pending_tasks'] > 1000) {
            $alerts[] = '警告:待处理任务过多';
        }
        
        if ($metrics['failed_tasks'] > 100) {
            $alerts[] = '警告:失败任务数量异常';
        }
        
        return $alerts;
    }
}
?>

6.2 运维建议

  • Redis持久化:配置AOF和RDB确保任务不丢失
  • 监控告警:监控队列长度、处理延迟、失败率
  • 水平扩展:多个工作者实例消费同一队列
  • 灰度发布:新版本处理器逐步替换旧版本
  • 日志记录:详细记录任务生命周期便于排查

总结

本文详细介绍了基于Redis的PHP异步延迟任务系统的设计与实现。该系统具有以下特点:

  1. 高性能:利用Redis内存操作,支持高并发任务处理
  2. 可靠性:任务持久化、失败重试、状态跟踪
  3. 灵活性:支持延迟执行、优先级、分布式部署
  4. 易扩展:模块化设计,方便添加新的任务类型

通过实际案例展示了如何在电商订单系统中应用该方案,有效解决了订单超时关闭的业务需求。读者可以根据实际业务需求,扩展更多任务处理器,构建适合自己业务场景的异步任务系统。

PHP异步任务队列实战:基于Redis的延迟任务系统设计与实现 | PHP高级应用教程
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 php PHP异步任务队列实战:基于Redis的延迟任务系统设计与实现 | PHP高级应用教程 https://www.taomawang.com/server/php/1624.html

常见问题

相关文章

猜你喜欢
发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务