PHP异步任务队列实战:基于Redis实现高性能后台任务处理系统

2025-12-08 0 963

原创技术教程 | 作者:PHP技术专家 | 发布日期:2023年10月

一、异步任务队列的技术背景

在现代Web应用中,用户请求往往需要处理耗时操作,如发送邮件、生成报表、处理图像等。如果这些操作同步执行,会导致用户等待时间过长,影响用户体验。异步任务队列通过将耗时任务放入队列,由后台工作进程异步处理,实现了请求的快速响应。

传统同步处理的局限性:

  • HTTP请求超时限制(通常30秒)
  • 服务器资源占用时间长
  • 无法处理大量并发任务
  • 单点故障影响整个系统

Redis作为队列存储的优势:

  • 内存操作,读写速度快
  • 支持丰富的数据结构(List、Sorted Set)
  • 持久化机制保证数据安全
  • 发布订阅功能支持任务通知

二、系统架构设计

我们设计一个三层架构的异步任务系统:

1. 任务生产者(Producer)

Web应用 → 创建任务 → Redis队列

2. 任务队列(Queue)

Redis List结构存储任务
支持优先级队列
任务状态追踪

3. 任务消费者(Consumer)

常驻PHP进程
多进程并发处理
失败重试机制

数据流程图:

用户请求 → 任务创建 → Redis入队 → 返回响应
后台Worker → 监听队列 → 执行任务 → 更新状态
监控系统 → 查看状态 → 错误报警 → 日志记录
                

三、完整代码实现

1. 环境要求

PHP 7.4+
Redis 5.0+
Composer依赖管理
PCNTL扩展(多进程支持)

2. 核心队列类实现

<?php
/**
 * Redis任务队列核心类
 */
class RedisTaskQueue
{
    private $redis;
    private $queueKey = 'task_queue';
    private $processingKey = 'task_processing';
    private $failedKey = 'task_failed';
    
    public function __construct($host = '127.0.0.1', $port = 6379)
    {
        $this->redis = new Redis();
        $this->redis->connect($host, $port);
        $this->redis->setOption(Redis::OPT_SERIALIZER, Redis::SERIALIZER_PHP);
    }
    
    /**
     * 添加任务到队列
     */
    public function push($task, $priority = 'normal')
    {
        $taskData = [
            'id' => uniqid('task_', true),
            'data' => $task,
            'priority' => $priority,
            'created_at' => time(),
            'status' => 'pending'
        ];
        
        $queueKey = $this->getPriorityKey($priority);
        return $this->redis->lPush($queueKey, $taskData);
    }
    
    /**
     * 获取任务(阻塞式)
     */
    public function pop($timeout = 30)
    {
        // 优先处理高优先级队列
        $task = $this->redis->brPop(['high_priority_queue', 'normal_queue'], $timeout);
        
        if ($task) {
            $taskData = $task[1];
            $taskData['status'] = 'processing';
            $taskData['started_at'] = time();
            
            // 移动到处理中队列
            $this->redis->hSet($this->processingKey, $taskData['id'], $taskData);
            
            return $taskData;
        }
        
        return null;
    }
    
    /**
     * 完成任务
     */
    public function complete($taskId, $result = null)
    {
        $task = $this->redis->hGet($this->processingKey, $taskId);
        
        if ($task) {
            $task['status'] = 'completed';
            $task['completed_at'] = time();
            $task['result'] = $result;
            
            // 保存到完成记录(可设置过期时间)
            $this->redis->hSet('task_completed', $taskId, $task);
            $this->redis->hDel($this->processingKey, $taskId);
            
            return true;
        }
        
        return false;
    }
    
    /**
     * 任务失败处理
     */
    public function fail($taskId, $error)
    {
        $task = $this->redis->hGet($this->processingKey, $taskId);
        
        if ($task) {
            $task['status'] = 'failed';
            $task['failed_at'] = time();
            $task['error'] = $error;
            $task['retry_count'] = ($task['retry_count'] ?? 0) + 1;
            
            // 重试逻辑(最多3次)
            if ($task['retry_count'] push($task['data'], $task['priority']);
            } else {
                $this->redis->lPush($this->failedKey, $task);
            }
            
            $this->redis->hDel($this->processingKey, $taskId);
        }
    }
    
    private function getPriorityKey($priority)
    {
        return $priority . '_priority_queue';
    }
    
    /**
     * 获取队列统计信息
     */
    public function getStats()
    {
        return [
            'pending_high' => $this->redis->lLen('high_priority_queue'),
            'pending_normal' => $this->redis->lLen('normal_queue'),
            'processing' => $this->redis->hLen($this->processingKey),
            'failed' => $this->redis->lLen($this->failedKey)
        ];
    }
}
?>

3. Worker工作进程实现

<?php
/**
 * 任务工作进程管理器
 */
class TaskWorkerManager
{
    private $queue;
    private $workers = [];
    private $maxWorkers = 5;
    private $shutdown = false;
    
    public function __construct(RedisTaskQueue $queue)
    {
        $this->queue = $queue;
        pcntl_signal(SIGTERM, [$this, 'shutdown']);
        pcntl_signal(SIGINT, [$this, 'shutdown']);
    }
    
    /**
     * 启动工作进程
     */
    public function start()
    {
        echo "启动任务工作进程...n";
        
        for ($i = 0; $i maxWorkers; $i++) {
            $this->forkWorker($i);
        }
        
        // 监控子进程
        while (!$this->shutdown) {
            pcntl_signal_dispatch();
            
            $status = null;
            $pid = pcntl_wait($status, WNOHANG);
            
            if ($pid > 0) {
                echo "工作进程 {$pid} 退出,重新启动...n";
                $this->restartWorker($pid);
            }
            
            sleep(1);
        }
        
        $this->stopAllWorkers();
    }
    
    private function forkWorker($workerId)
    {
        $pid = pcntl_fork();
        
        if ($pid == -1) {
            die("无法创建子进程");
        } elseif ($pid == 0) {
            // 子进程
            $this->runWorker($workerId);
            exit(0);
        } else {
            // 父进程记录PID
            $this->workers[$pid] = [
                'id' => $workerId,
                'started_at' => time()
            ];
        }
    }
    
    private function runWorker($workerId)
    {
        echo "工作进程 {$workerId} 启动,PID: " . getmypid() . "n";
        
        $taskHandler = new TaskHandler();
        
        while (!$this->shutdown) {
            try {
                // 获取任务(阻塞30秒)
                $task = $this->queue->pop(30);
                
                if ($task) {
                    echo "进程 {$workerId} 处理任务: {$task['id']}n";
                    
                    // 执行任务
                    $result = $taskHandler->handle($task['data']);
                    
                    // 标记完成
                    $this->queue->complete($task['id'], $result);
                    
                    echo "进程 {$workerId} 完成任务: {$task['id']}n";
                }
                
            } catch (Exception $e) {
                error_log("工作进程错误: " . $e->getMessage());
                
                if (isset($task)) {
                    $this->queue->fail($task['id'], $e->getMessage());
                }
                
                // 短暂休息后继续
                sleep(5);
            }
        }
    }
    
    public function shutdown()
    {
        echo "接收到关闭信号,停止所有工作进程...n";
        $this->shutdown = true;
    }
    
    private function stopAllWorkers()
    {
        foreach (array_keys($this->workers) as $pid) {
            posix_kill($pid, SIGTERM);
        }
    }
    
    private function restartWorker($oldPid)
    {
        if (isset($this->workers[$oldPid])) {
            $workerId = $this->workers[$oldPid]['id'];
            unset($this->workers[$oldPid]);
            $this->forkWorker($workerId);
        }
    }
}

/**
 * 任务处理器基类
 */
abstract class TaskHandler
{
    abstract public function handle($taskData);
    
    protected function log($message)
    {
        file_put_contents(
            'task_log.log',
            date('Y-m-d H:i:s') . " - " . $message . "n",
            FILE_APPEND
        );
    }
}
?>

四、实战案例:邮件发送系统

1. 邮件任务处理器

<?php
/**
 * 邮件发送任务处理器
 */
class EmailTaskHandler extends TaskHandler
{
    private $mailer;
    
    public function __construct()
    {
        // 初始化邮件发送器(这里使用PHPMailer示例)
        $this->mailer = new PHPMailerPHPMailerPHPMailer();
        $this->mailer->isSMTP();
        $this->mailer->Host = 'smtp.example.com';
        $this->mailer->SMTPAuth = true;
        $this->mailer->Username = 'user@example.com';
        $this->mailer->Password = 'password';
        $this->mailer->SMTPSecure = 'tls';
        $this->mailer->Port = 587;
    }
    
    public function handle($taskData)
    {
        $this->log("开始发送邮件到: " . $taskData['to']);
        
        $this->mailer->setFrom('noreply@example.com', '系统通知');
        $this->mailer->addAddress($taskData['to']);
        $this->mailer->Subject = $taskData['subject'];
        $this->mailer->Body = $taskData['body'];
        $this->mailer->isHTML(true);
        
        if ($this->mailer->send()) {
            $this->log("邮件发送成功: " . $taskData['to']);
            return ['success' => true, 'message_id' => $this->mailer->getLastMessageID()];
        } else {
            throw new Exception("邮件发送失败: " . $this->mailer->ErrorInfo);
        }
    }
}
?>

2. Web控制器中使用队列

<?php
/**
 * 用户注册控制器示例
 */
class RegisterController
{
    private $queue;
    
    public function __construct()
    {
        $this->queue = new RedisTaskQueue();
    }
    
    public function register($userData)
    {
        // 1. 保存用户到数据库
        $userId = $this->saveUser($userData);
        
        // 2. 创建欢迎邮件任务(高优先级)
        $emailTask = [
            'to' => $userData['email'],
            'subject' => '欢迎注册我们的服务',
            'body' => $this->renderWelcomeEmail($userData),
            'user_id' => $userId
        ];
        
        $this->queue->push($emailTask, 'high');
        
        // 3. 创建后续跟进任务(普通优先级)
        $followupTask = [
            'type' => 'followup_email',
            'user_id' => $userId,
            'scheduled_time' => time() + 86400 // 24小时后
        ];
        
        $this->queue->push($followupTask, 'normal');
        
        // 4. 立即返回响应
        return [
            'success' => true,
            'message' => '注册成功,欢迎邮件已发送',
            'user_id' => $userId
        ];
    }
    
    private function renderWelcomeEmail($userData)
    {
        return "<h1>欢迎,{$userData['name']}!</h1>
                <p>感谢您注册我们的服务。</p>
                <p>您的账户已经激活,可以立即开始使用。</p>";
    }
}

// 使用示例
$controller = new RegisterController();
$result = $controller->register([
    'name' => '张三',
    'email' => 'zhangsan@example.com',
    'password' => 'hashed_password'
]);

echo json_encode($result);
?>

3. 启动Worker进程

<?php
// worker.php
require_once 'vendor/autoload.php';
require_once 'RedisTaskQueue.php';
require_once 'TaskWorkerManager.php';
require_once 'EmailTaskHandler.php';

// 注册自定义错误处理
set_error_handler(function($errno, $errstr, $errfile, $errline) {
    error_log("Worker错误: [$errno] $errstr in $errfile on line $errline");
});

// 创建队列实例
$queue = new RedisTaskQueue('127.0.0.1', 6379);

// 启动Worker管理器
$manager = new TaskWorkerManager($queue);
$manager->start();

// 命令行启动:php worker.php --daemon
?>

五、性能优化与监控

1. 性能优化策略

  • 连接池管理:复用Redis连接,避免频繁创建连接
  • 批量处理:一次处理多个任务,减少IO操作
  • 内存控制:监控Worker内存使用,防止内存泄漏
  • 动态扩缩容:根据队列长度自动调整Worker数量

2. 监控系统实现

<?php
/**
 * 队列监控仪表板
 */
class QueueMonitor
{
    private $queue;
    private $redis;
    
    public function __construct(RedisTaskQueue $queue)
    {
        $this->queue = $queue;
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
    }
    
    public function getDashboardData()
    {
        $stats = $this->queue->getStats();
        
        // 获取处理速度
        $hourlyStats = $this->redis->get('queue_stats:hourly');
        $dailyStats = $this->redis->get('queue_stats:daily');
        
        // 获取失败任务详情
        $failedTasks = $this->redis->lRange('task_failed', 0, 9);
        
        return [
            'current_stats' => $stats,
            'performance' => [
                'tasks_per_hour' => $hourlyStats['completed'] ?? 0,
                'avg_processing_time' => $this->calculateAvgTime(),
                'failure_rate' => $this->calculateFailureRate()
            ],
            'recent_failures' => $failedTasks,
            'system_health' => $this->checkSystemHealth()
        ];
    }
    
    public function alertIfNeeded($stats)
    {
        // 失败率超过5%报警
        $failureRate = $this->calculateFailureRate();
        if ($failureRate > 5) {
            $this->sendAlert("队列失败率过高: {$failureRate}%");
        }
        
        // 积压任务过多报警
        if ($stats['pending_normal'] > 1000) {
            $this->sendAlert("队列积压严重: {$stats['pending_normal']} 个任务等待");
        }
    }
    
    private function sendAlert($message)
    {
        // 集成到报警系统(邮件、Slack、钉钉等)
        file_put_contents('alerts.log', date('Y-m-d H:i:s') . " - " . $message . "n", FILE_APPEND);
    }
}
?>

3. 高级特性扩展

  • 延迟队列:使用Redis的Sorted Set实现定时任务
  • 任务依赖:实现任务之间的依赖关系
  • 分布式部署:多服务器部署Worker,实现负载均衡
  • 任务优先级:支持多级优先级队列
  • 任务进度追踪:实时反馈任务执行进度

4. 生产环境部署建议

  1. 使用Supervisor管理Worker进程
  2. 配置Redis持久化和主从复制
  3. 实现日志轮转和监控告警
  4. 设置合理的任务超时时间
  5. 定期清理已完成的任务记录

六、总结与展望

本文详细介绍了基于PHP和Redis的异步任务队列系统的设计与实现。通过这个系统,我们可以:

  • 显著提升Web应用的响应速度
  • 提高系统的吞吐量和并发处理能力
  • 实现任务的可靠执行和失败重试
  • 方便地扩展新的任务类型

技术要点回顾:

  1. Redis的List数据结构非常适合实现队列
  2. PHP的多进程编程需要谨慎处理资源竞争
  3. 任务状态管理是保证系统可靠性的关键
  4. 监控和报警是生产环境必不可少的组件

未来扩展方向:

  • 集成更专业的消息队列(RabbitMQ、Kafka)
  • 实现可视化任务管理界面
  • 支持分布式事务和最终一致性
  • 容器化部署和自动扩缩容

通过本文的学习,您已经掌握了构建高性能PHP异步任务系统的核心技能。在实际项目中,可以根据具体需求调整和优化这个基础框架,构建出适合自己业务场景的异步处理系统。

PHP异步任务队列实战:基于Redis实现高性能后台任务处理系统
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 php PHP异步任务队列实战:基于Redis实现高性能后台任务处理系统 https://www.taomawang.com/server/php/1481.html

常见问题

相关文章

猜你喜欢
发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务