PHP异步任务队列实战:基于Redis实现高性能后台任务处理器 | 现代Web开发指南

2026-01-29 0 364
免费资源下载

一、异步任务队列的必要性

在现代Web应用中,用户请求往往需要处理耗时操作,如发送邮件、生成报表、处理图片等。如果这些操作同步执行,会导致用户等待时间过长,影响用户体验。异步任务队列通过将耗时任务放入队列,由后台进程异步处理,实现了请求的快速响应。

本文将详细讲解如何使用PHP和Redis构建一个完整的异步任务队列系统,包含任务发布、消费、重试、失败处理等完整功能。

二、系统架构设计

我们的异步队列系统包含以下核心组件:

  1. 任务生产者(Producer):负责创建和发布任务到Redis队列
  2. Redis存储:使用List数据结构存储待处理任务,Sorted Set处理延迟任务
  3. 任务消费者(Consumer):守护进程,持续从队列获取并执行任务
  4. 监控系统:实时监控队列状态和任务执行情况
+-------------+     +-------------+     +-------------+
|   Web应用   |---->|   Redis     |---->|   Worker    |
|  (生产者)   |     |   (队列)    |     |  (消费者)   |
+-------------+     +-------------+     +-------------+

三、完整代码实现

3.1 Redis队列管理器

<?php
/**
 * Redis任务队列管理器
 */
class RedisQueueManager
{
    private $redis;
    private $queueName = 'async_tasks';
    private $delayQueueName = 'delay_tasks';
    
    public function __construct($config = [])
    {
        $this->redis = new Redis();
        $this->redis->connect(
            $config['host'] ?? '127.0.0.1',
            $config['port'] ?? 6379
        );
        
        if (!empty($config['password'])) {
            $this->redis->auth($config['password']);
        }
        
        if (!empty($config['database'])) {
            $this->redis->select($config['database']);
        }
    }
    
    /**
     * 发布即时任务
     */
    public function publish($taskType, $data, $priority = 'normal')
    {
        $task = [
            'id' => uniqid('task_', true),
            'type' => $taskType,
            'data' => $data,
            'created_at' => time(),
            'priority' => $priority,
            'attempts' => 0,
            'max_attempts' => 3
        ];
        
        $jsonTask = json_encode($task, JSON_UNESCAPED_UNICODE);
        
        // 根据优先级选择队列
        if ($priority === 'high') {
            return $this->redis->lPush($this->queueName . ':high', $jsonTask);
        } else {
            return $this->redis->lPush($this->queueName, $jsonTask);
        }
    }
    
    /**
     * 发布延迟任务
     */
    public function publishDelay($taskType, $data, $delaySeconds)
    {
        $task = [
            'id' => uniqid('delay_', true),
            'type' => $taskType,
            'data' => $data,
            'created_at' => time(),
            'execute_at' => time() + $delaySeconds
        ];
        
        $jsonTask = json_encode($task, JSON_UNESCAPED_UNICODE);
        
        return $this->redis->zAdd(
            $this->delayQueueName,
            time() + $delaySeconds,
            $jsonTask
        );
    }
    
    /**
     * 消费任务
     */
    public function consume($timeout = 5)
    {
        // 先检查延迟队列中是否有到期的任务
        $this->moveDelayedTasks();
        
        // 优先处理高优先级队列
        $task = $this->redis->rPop($this->queueName . ':high');
        
        if (!$task) {
            $task = $this->redis->brPop([$this->queueName], $timeout);
            if ($task) {
                $task = $task[1]; // brPop返回数组
            }
        }
        
        return $task ? json_decode($task, true) : null;
    }
    
    /**
     * 将到期的延迟任务移到主队列
     */
    private function moveDelayedTasks()
    {
        $now = time();
        $tasks = $this->redis->zRangeByScore(
            $this->delayQueueName,
            0,
            $now
        );
        
        if (!empty($tasks)) {
            foreach ($tasks as $task) {
                $this->redis->lPush($this->queueName, $task);
                $this->redis->zRem($this->delayQueueName, $task);
            }
        }
    }
    
    /**
     * 获取队列状态
     */
    public function getQueueStats()
    {
        return [
            'pending_normal' => $this->redis->lLen($this->queueName),
            'pending_high' => $this->redis->lLen($this->queueName . ':high'),
            'pending_delayed' => $this->redis->zCard($this->delayQueueName)
        ];
    }
}

3.2 任务处理器基类

<?php
/**
 * 任务处理器抽象基类
 */
abstract class TaskHandler
{
    protected $maxRetries = 3;
    protected $retryDelay = 60; // 重试延迟秒数
    
    abstract public function handle($taskData);
    
    public function failed($taskData, $exception)
    {
        // 记录失败日志
        error_log(sprintf(
            "任务处理失败: %s, 错误: %s",
            json_encode($taskData),
            $exception->getMessage()
        ));
        
        // 可以发送告警通知
        $this->sendAlert($taskData, $exception);
    }
    
    protected function sendAlert($taskData, $exception)
    {
        // 实现告警逻辑,如发送邮件、Slack通知等
    }
}

/**
 * 邮件发送任务处理器
 */
class EmailTaskHandler extends TaskHandler
{
    public function handle($taskData)
    {
        $to = $taskData['to'];
        $subject = $taskData['subject'];
        $body = $taskData['body'];
        
        // 模拟邮件发送
        $success = $this->sendEmail($to, $subject, $body);
        
        if (!$success) {
            throw new Exception('邮件发送失败');
        }
        
        return true;
    }
    
    private function sendEmail($to, $subject, $body)
    {
        // 实际邮件发送逻辑
        // 这里使用模拟实现
        sleep(1); // 模拟耗时操作
        return true;
    }
}

/**
 * 图片处理任务处理器
 */
class ImageProcessHandler extends TaskHandler
{
    public function handle($taskData)
    {
        $imagePath = $taskData['path'];
        $operations = $taskData['operations'];
        
        foreach ($operations as $operation) {
            switch ($operation['type']) {
                case 'resize':
                    $this->resizeImage($imagePath, $operation['width'], $operation['height']);
                    break;
                case 'watermark':
                    $this->addWatermark($imagePath, $operation['text']);
                    break;
                case 'compress':
                    $this->compressImage($imagePath, $operation['quality']);
                    break;
            }
        }
        
        return true;
    }
    
    private function resizeImage($path, $width, $height)
    {
        // 图片缩放实现
        sleep(2); // 模拟耗时操作
    }
    
    private function addWatermark($path, $text)
    {
        // 添加水印实现
        sleep(1);
    }
    
    private function compressImage($path, $quality)
    {
        // 图片压缩实现
        sleep(1);
    }
}

3.3 消费者守护进程

<?php
/**
 * 队列消费者守护进程
 */
class QueueConsumerDaemon
{
    private $queueManager;
    private $handlers = [];
    private $running = false;
    
    public function __construct(RedisQueueManager $queueManager)
    {
        $this->queueManager = $queueManager;
    }
    
    public function registerHandler($taskType, TaskHandler $handler)
    {
        $this->handlers[$taskType] = $handler;
    }
    
    public function start($maxTasks = null)
    {
        $this->running = true;
        $processed = 0;
        
        pcntl_signal(SIGTERM, [$this, 'shutdown']);
        pcntl_signal(SIGINT, [$this, 'shutdown']);
        
        echo "消费者守护进程启动...n";
        
        while ($this->running) {
            // 检查信号
            pcntl_signal_dispatch();
            
            $task = $this->queueManager->consume();
            
            if ($task) {
                $this->processTask($task);
                $processed++;
                
                // 如果设置了最大处理任务数
                if ($maxTasks && $processed >= $maxTasks) {
                    $this->shutdown();
                }
            }
            
            // 防止CPU空转
            if (!$task) {
                sleep(1);
            }
        }
        
        echo "消费者守护进程停止n";
    }
    
    private function processTask($task)
    {
        $taskType = $task['type'];
        
        if (!isset($this->handlers[$taskType])) {
            error_log("未注册的任务类型: {$taskType}");
            return;
        }
        
        $handler = $this->handlers[$taskType];
        
        try {
            echo "处理任务: {$task['id']} [{$taskType}]n";
            $handler->handle($task['data']);
            echo "任务完成: {$task['id']}n";
            
        } catch (Exception $e) {
            $task['attempts']++;
            
            if ($task['attempts'] retryDelay;
                $this->queueManager->publishDelay(
                    $taskType,
                    $task['data'],
                    $handler->retryDelay
                );
                echo "任务重试: {$task['id']} (尝试 {$task['attempts']})n";
            } else {
                // 最终失败
                $handler->failed($task['data'], $e);
                echo "任务失败: {$task['id']}n";
            }
        }
    }
    
    public function shutdown()
    {
        $this->running = false;
    }
}

四、实战应用示例

4.1 生产者示例(Web应用中使用)

<?php
// 在控制器或服务中使用
class UserController
{
    public function registerAction($userData)
    {
        // 保存用户到数据库
        $userId = $this->saveUser($userData);
        
        // 初始化队列管理器
        $queue = new RedisQueueManager([
            'host' => 'localhost',
            'port' => 6379,
            'database' => 0
        ]);
        
        // 发送欢迎邮件(异步)
        $queue->publish('email', [
            'to' => $userData['email'],
            'subject' => '欢迎注册',
            'body' => '感谢您注册我们的服务!'
        ]);
        
        // 延迟任务:注册24小时后发送提醒邮件
        $queue->publishDelay('email', [
            'to' => $userData['email'],
            'subject' => '快来体验更多功能',
            'body' => '您已经注册24小时了,快来体验我们的高级功能吧!'
        ], 86400); // 24小时后
        
        // 处理用户头像(异步)
        if (!empty($_FILES['avatar'])) {
            $queue->publish('image_process', [
                'path' => $_FILES['avatar']['tmp_name'],
                'operations' => [
                    ['type' => 'resize', 'width' => 200, 'height' => 200],
                    ['type' => 'compress', 'quality' => 80]
                ]
            ], 'high'); // 高优先级
        }
        
        // 立即返回响应,无需等待任务完成
        return [
            'success' => true,
            'message' => '注册成功,欢迎邮件已发送',
            'user_id' => $userId
        ];
    }
}

4.2 启动消费者守护进程

<?php
// consumer.php
require_once 'RedisQueueManager.php';
require_once 'TaskHandler.php';
require_once 'QueueConsumerDaemon.php';

// 初始化队列管理器
$queueManager = new RedisQueueManager([
    'host' => 'localhost',
    'port' => 6379
]);

// 初始化消费者
$consumer = new QueueConsumerDaemon($queueManager);

// 注册任务处理器
$consumer->registerHandler('email', new EmailTaskHandler());
$consumer->registerHandler('image_process', new ImageProcessHandler());

// 启动消费者(无限运行)
$consumer->start();

// 或者指定最大处理任务数(适合测试)
// $consumer->start(100); // 处理100个任务后停止

4.3 监控队列状态

<?php
// monitor.php
$queueManager = new RedisQueueManager();

// 获取队列统计信息
$stats = $queueManager->getQueueStats();

echo "队列状态监控:n";
echo "普通待处理任务: {$stats['pending_normal']}n";
echo "高优先级任务: {$stats['pending_high']}n";
echo "延迟任务: {$stats['pending_delayed']}n";

// 可以集成到管理后台或监控系统
if ($stats['pending_normal'] > 1000) {
    // 发送告警
    mail('admin@example.com', '队列积压告警', '普通任务队列积压超过1000');
}

五、高级特性与优化

5.1 多进程消费

为了提高消费能力,可以启动多个消费者进程:

<?php
// multi_consumer.php
$maxWorkers = 4; // 根据CPU核心数调整

for ($i = 0; $i registerHandler('email', new EmailTaskHandler());
        $consumer->start();
        exit(0);
    }
}

// 等待所有子进程
while (pcntl_waitpid(0, $status) != -1) {
    $status = pcntl_wexitstatus($status);
    echo "子进程退出,状态: {$status}n";
}

5.2 任务优先级与延迟队列

系统支持三种队列类型:

  • 高优先级队列:立即处理的重要任务
  • 普通队列:常规异步任务
  • 延迟队列:定时执行的任务

5.3 失败重试机制

每个任务支持配置最大重试次数,失败的任务会延迟重试,避免立即重试造成的资源浪费。

六、总结与最佳实践

6.1 性能优化建议

  1. 连接池:在生产环境中使用Redis连接池
  2. 序列化优化:对于大型数据,考虑使用MessagePack替代JSON
  3. 监控告警:实现完整的监控和告警系统
  4. 优雅停机:消费者应支持优雅停机,处理完当前任务后再退出

6.2 扩展方向

  • 添加任务进度追踪功能
  • 实现任务依赖关系
  • 添加任务取消机制
  • 集成到Laravel、Symfony等框架

6.3 生产环境部署

建议使用Supervisor管理消费者进程:

; /etc/supervisor/conf.d/queue-worker.conf
[program:queue-worker]
command=php /path/to/consumer.php
process_name=%(program_name)s_%(process_num)02d
numprocs=4
directory=/path/to
autostart=true
autorestart=true
user=www-data
redirect_stderr=true
stdout_logfile=/var/log/queue-worker.log
PHP异步任务队列实战:基于Redis实现高性能后台任务处理器 | 现代Web开发指南
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 php PHP异步任务队列实战:基于Redis实现高性能后台任务处理器 | 现代Web开发指南 https://www.taomawang.com/server/php/1569.html

常见问题

相关文章

猜你喜欢
发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务