PHP异步任务队列系统实战:基于Redis实现高性能后台任务处理 | PHP高级编程教程

2026-01-29 0 434
免费资源下载

基于Redis构建企业级高性能后台任务处理系统

为什么需要异步任务队列?

在现代Web应用中,许多操作如邮件发送、图片处理、数据报表生成等耗时任务会阻塞用户请求,导致响应延迟。异步任务队列通过将耗时任务放入后台处理,实现请求的快速响应和系统解耦。

本教程将构建一个完整的PHP异步任务队列系统,包含任务生产者、消费者、监控和失败重试机制。

一、系统架构设计

📤 任务生产者

Web应用将任务序列化后推送到Redis队列,立即返回响应给用户。

🔄 Redis队列

使用Redis List结构存储待处理任务,Sorted Set存储延迟任务。

⚙️ Worker消费者

常驻PHP进程从队列获取任务并执行,支持多进程并发处理。

📊 监控系统

实时监控队列长度、处理速度、失败任务等关键指标。

二、核心代码实现

1. 任务基类定义 (Task.php)

<?php

abstract class Task
{
    protected $id;
    protected $data;
    protected $attempts = 0;
    protected $maxAttempts = 3;
    protected $delay = 0;
    protected $createdAt;
    
    public function __construct(array $data = [])
    {
        $this->id = uniqid('task_', true);
        $this->data = $data;
        $this->createdAt = time();
    }
    
    // 抽象方法,子类必须实现
    abstract public function handle(): bool;
    
    // 任务失败时的处理
    public function failed(Exception $e): void
    {
        error_log("Task {$this->id} failed: " . $e->getMessage());
    }
    
    // 序列化任务
    public function serialize(): string
    {
        return json_encode([
            'id' => $this->id,
            'class' => get_class($this),
            'data' => $this->data,
            'attempts' => $this->attempts,
            'max_attempts' => $this->maxAttempts,
            'delay' => $this->delay,
            'created_at' => $this->createdAt
        ], JSON_UNESCAPED_UNICODE);
    }
    
    // 反序列化任务
    public static function unserialize(string $data): self
    {
        $decoded = json_decode($data, true);
        $task = new $decoded['class']($decoded['data']);
        $task->id = $decoded['id'];
        $task->attempts = $decoded['attempts'];
        $task->maxAttempts = $decoded['max_attempts'];
        $task->delay = $decoded['delay'];
        $task->createdAt = $decoded['created_at'];
        return $task;
    }
    
    // Getter方法
    public function getId(): string { return $this->id; }
    public function getAttempts(): int { return $this->attempts; }
    public function incrementAttempts(): void { $this->attempts++; }
    public function canRetry(): bool { return $this->attempts maxAttempts; }
}

2. 队列管理器 (QueueManager.php)

<?php

class QueueManager
{
    private $redis;
    private $queueName = 'default_queue';
    private $delayedQueueName = 'delayed_queue';
    private $failedQueueName = 'failed_queue';
    
    public function __construct(string $host = '127.0.0.1', int $port = 6379)
    {
        $this->redis = new Redis();
        $this->redis->connect($host, $port);
        $this->redis->setOption(Redis::OPT_SERIALIZER, Redis::SERIALIZER_NONE);
    }
    
    // 推送任务到队列
    public function push(Task $task, string $queue = null): bool
    {
        $queue = $queue ?? $this->queueName;
        
        if ($task->delay > 0) {
            // 延迟任务使用Sorted Set存储
            $score = time() + $task->delay;
            return $this->redis->zAdd(
                $this->delayedQueueName . ':' . $queue,
                $score,
                $task->serialize()
            ) > 0;
        }
        
        // 立即执行的任务使用List存储
        return $this->redis->lPush($queue, $task->serialize()) > 0;
    }
    
    // 从队列获取任务
    public function pop(string $queue = null): ?Task
    {
        $queue = $queue ?? $this->queueName;
        
        // 检查延迟队列是否有到期的任务
        $this->moveDelayedTasks($queue);
        
        // 阻塞式获取任务,超时时间5秒
        $result = $this->redis->brPop([$queue], 5);
        
        if ($result === false) {
            return null;
        }
        
        try {
            return Task::unserialize($result[1]);
        } catch (Exception $e) {
            error_log("Failed to unserialize task: " . $e->getMessage());
            return null;
        }
    }
    
    // 将延迟任务移动到就绪队列
    private function moveDelayedTasks(string $queue): void
    {
        $now = time();
        $delayedKey = $this->delayedQueueName . ':' . $queue;
        
        // 获取所有到期的任务
        $tasks = $this->redis->zRangeByScore($delayedKey, 0, $now);
        
        if (!empty($tasks)) {
            // 使用管道批量操作
            $pipe = $this->redis->multi(Redis::PIPELINE);
            foreach ($tasks as $task) {
                $pipe->lPush($queue, $task);
                $pipe->zRem($delayedKey, $task);
            }
            $pipe->exec();
        }
    }
    
    // 记录失败任务
    public function markAsFailed(Task $task, string $error): void
    {
        $failedData = [
            'task' => $task->serialize(),
            'error' => $error,
            'failed_at' => date('Y-m-d H:i:s'),
            'attempts' => $task->getAttempts()
        ];
        
        $this->redis->lPush(
            $this->failedQueueName . ':' . ($queue ?? $this->queueName),
            json_encode($failedData, JSON_UNESCAPED_UNICODE)
        );
    }
    
    // 获取队列统计信息
    public function getStats(string $queue = null): array
    {
        $queue = $queue ?? $this->queueName;
        
        return [
            'pending' => $this->redis->lLen($queue),
            'delayed' => $this->redis->zCard($this->delayedQueueName . ':' . $queue),
            'failed' => $this->redis->lLen($this->failedQueueName . ':' . $queue)
        ];
    }
}

3. 具体任务示例:邮件发送任务 (SendEmailTask.php)

<?php

class SendEmailTask extends Task
{
    private $to;
    private $subject;
    private $body;
    private $headers;
    
    public function __construct(array $data)
    {
        parent::__construct($data);
        $this->to = $data['to'] ?? '';
        $this->subject = $data['subject'] ?? '';
        $this->body = $data['body'] ?? '';
        $this->headers = $data['headers'] ?? [];
        
        // 邮件任务最多重试2次
        $this->maxAttempts = 2;
    }
    
    public function handle(): bool
    {
        // 模拟邮件发送过程
        echo "Sending email to: {$this->to}n";
        echo "Subject: {$this->subject}n";
        
        // 模拟网络延迟
        sleep(rand(1, 3));
        
        // 模拟90%的成功率
        if (rand(1, 10)  %s",
            date('Y-m-d H:i:s'),
            $this->to,
            $e->getMessage()
        );
        
        file_put_contents('email_failures.log', $logMessage . PHP_EOL, FILE_APPEND);
    }
}

三、Worker进程实现

Worker守护进程 (worker.php)

<?php

require_once 'Task.php';
require_once 'QueueManager.php';
require_once 'SendEmailTask.php';

class Worker
{
    private $queueManager;
    private $shouldStop = false;
    private $processedCount = 0;
    private $startTime;
    
    public function __construct()
    {
        $this->queueManager = new QueueManager();
        $this->startTime = time();
        
        // 注册信号处理器,实现优雅退出
        pcntl_async_signals(true);
        pcntl_signal(SIGTERM, [$this, 'stop']);
        pcntl_signal(SIGINT, [$this, 'stop']);
    }
    
    public function start(string $queue = null): void
    {
        echo "Worker started at " . date('Y-m-d H:i:s') . "n";
        echo "Listening on queue: " . ($queue ?? 'default') . "n";
        
        while (!$this->shouldStop) {
            $this->processNextTask($queue);
            
            // 每处理100个任务输出一次状态
            if ($this->processedCount % 100 === 0 && $this->processedCount > 0) {
                $this->printStats($queue);
            }
        }
        
        echo "Worker stopped gracefully.n";
    }
    
    private function processNextTask(string $queue = null): void
    {
        $task = $this->queueManager->pop($queue);
        
        if ($task === null) {
            // 队列为空,短暂休眠避免CPU空转
            usleep(100000); // 100ms
            return;
        }
        
        try {
            echo "Processing task: {$task->getId()}n";
            
            $task->incrementAttempts();
            $success = $task->handle();
            
            if ($success) {
                echo "Task {$task->getId()} completed successfully.n";
                $this->processedCount++;
            } else {
                throw new Exception("Task handle() returned false");
            }
            
        } catch (Exception $e) {
            echo "Task {$task->getId()} failed: " . $e->getMessage() . "n";
            
            if ($task->canRetry()) {
                // 重试任务,增加延迟时间(指数退避)
                $retryDelay = pow(2, $task->getAttempts() - 1) * 60; // 1m, 2m, 4m...
                $task->delay = $retryDelay;
                $this->queueManager->push($task, $queue);
                echo "Task scheduled for retry in {$retryDelay} seconds.n";
            } else {
                // 记录到失败队列
                $this->queueManager->markAsFailed($task, $e->getMessage());
                $task->failed($e);
                echo "Task moved to failed queue after {$task->getAttempts()} attempts.n";
            }
        }
    }
    
    private function printStats(string $queue = null): void
    {
        $stats = $this->queueManager->getStats($queue);
        $uptime = time() - $this->startTime;
        
        echo "n=== Worker Stats ===n";
        echo "Uptime: " . gmdate("H:i:s", $uptime) . "n";
        echo "Processed: {$this->processedCount}n";
        echo "Pending: {$stats['pending']}n";
        echo "Delayed: {$stats['delayed']}n";
        echo "Failed: {$stats['failed']}n";
        echo "===================nn";
    }
    
    public function stop(): void
    {
        echo "nReceived stop signal, finishing current task...n";
        $this->shouldStop = true;
    }
}

// 启动Worker
if (php_sapi_name() === 'cli') {
    $worker = new Worker();
    $queue = $argv[1] ?? null;
    $worker->start($queue);
} else {
    die("This script must be run from command line.n");
}

四、Web应用集成示例

在Laravel控制器中使用任务队列

<?php

namespace AppHttpControllers;

use AppJobsSendWelcomeEmail;
use AppJobsProcessUserUpload;
use AppJobsGenerateDataReport;
use IlluminateHttpRequest;

class UserController extends Controller
{
    public function register(Request $request)
    {
        // 验证用户输入
        $validated = $request->validate([
            'name' => 'required|string|max:255',
            'email' => 'required|email|unique:users',
            'password' => 'required|min:8',
        ]);
        
        // 同步操作:创建用户记录
        $user = User::create($validated);
        
        // 异步任务1:发送欢迎邮件(立即执行)
        $emailTask = new SendEmailTask([
            'to' => $user->email,
            'subject' => '欢迎注册我们的服务',
            'body' => $this->generateWelcomeEmail($user->name),
            'template' => 'welcome'
        ]);
        
        $queueManager = new QueueManager();
        $queueManager->push($emailTask);
        
        // 异步任务2:处理用户头像(延迟5分钟执行)
        if ($request->hasFile('avatar')) {
            $avatarTask = new ProcessImageTask([
                'user_id' => $user->id,
                'image_path' => $request->file('avatar')->store('temp'),
                'sizes' => ['small', 'medium', 'large']
            ]);
            $avatarTask->delay = 300; // 5分钟延迟
            $queueManager->push($avatarTask, 'image_processing');
        }
        
        // 异步任务3:生成初始数据报告(低优先级队列)
        $reportTask = new GenerateReportTask([
            'user_id' => $user->id,
            'report_type' => 'initial_analysis',
            'time_range' => 'all'
        ]);
        $queueManager->push($reportTask, 'low_priority');
        
        // 立即返回响应,不等待任务完成
        return response()->json([
            'message' => '注册成功,欢迎邮件已发送',
            'user_id' => $user->id,
            'queued_tasks' => 3
        ], 201);
    }
    
    public function batchImport(Request $request)
    {
        // 批量导入用户,使用任务队列处理
        $users = $request->input('users');
        $importId = uniqid('import_');
        
        foreach (array_chunk($users, 100) as $chunkIndex => $chunk) {
            $importTask = new BatchImportTask([
                'import_id' => $importId,
                'chunk_index' => $chunkIndex,
                'users' => $chunk,
                'total_chunks' => ceil(count($users) / 100)
            ]);
            
            // 使用不同的队列实现负载均衡
            $queueName = 'import_' . ($chunkIndex % 4); // 4个导入队列
            $queueManager->push($importTask, $queueName);
        }
        
        return response()->json([
            'message' => '批量导入任务已提交',
            'import_id' => $importId,
            'total_users' => count($users),
            'queued_chunks' => ceil(count($users) / 100)
        ]);
    }
}

五、监控与管理脚本

队列监控脚本 (monitor.php)

<?php
// 实时监控队列状态
$queues = ['default', 'email', 'image_processing', 'low_priority'];
$manager = new QueueManager();

while (true) {
    system('clear'); // 清屏
    
    echo "=== 队列监控面板 ===n";
    echo "时间: " . date('Y-m-d H:i:s') . "nn";
    
    foreach ($queues as $queue) {
        $stats = $manager->getStats($queue);
        echo str_pad($queue, 20) . " | ";
        echo "待处理: " . str_pad($stats['pending'], 6) . " | ";
        echo "延迟: " . str_pad($stats['delayed'], 6) . " | ";
        echo "失败: " . $stats['failed'] . "n";
    }
    
    // 绘制简单的ASCII图表
    echo "n📊 队列负载图表:n";
    foreach ($queues as $queue) {
        $stats = $manager->getStats($queue);
        $bars = min(50, $stats['pending']);
        echo str_pad($queue, 20) . " | " . str_repeat('█', $bars) . "n";
    }
    
    sleep(2); // 2秒刷新一次
}

失败任务重试脚本 (retry_failed.php)

<?php
// 重试失败队列中的任务
$manager = new QueueManager();
$queues = ['default', 'email'];

foreach ($queues as $queue) {
    $failedKey = 'failed_queue:' . $queue;
    $failedCount = $manager->redis->lLen($failedKey);
    
    echo "检查队列: {$queue}, 失败任务数: {$failedCount}n";
    
    for ($i = 0; $i redis->lIndex($failedKey, $i),
            true
        );
        
        if ($failedData && isset($failedData['task'])) {
            $task = Task::unserialize($failedData['task']);
            
            echo "重试任务: {$task->getId()}, ";
            echo "原错误: {$failedData['error']}n";
            
            // 询问是否重试
            echo "是否重试此任务? (y/n): ";
            $handle = fopen("php://stdin", "r");
            $answer = trim(fgets($handle));
            
            if (strtolower($answer) === 'y') {
                $manager->push($task, $queue);
                $manager->redis->lRem($failedKey, $failedData, 1);
                echo "✅ 任务已重新加入队列n";
            } else {
                echo "⏭️  跳过此任务n";
            }
        }
    }
}

六、部署与优化建议

🔧 Supervisor配置

[program:php-worker-default]
command=php /path/to/worker.php default
process_name=%(program_name)s_%(process_num)02d
numprocs=4                    ; 启动4个进程
autostart=true
autorestart=true
stopwaitsecs=30
user=www-data
stdout_logfile=/var/log/worker-default.log
stderr_logfile=/var/log/worker-error.log

[program:php-worker-email]
command=php /path/to/worker.php email
process_name=%(program_name)s_%(process_num)02d
numprocs=2
autostart=true
autorestart=true

[program:php-worker-images]
command=php /path/to/worker.php image_processing
process_name=%(program_name)s_%(process_num)02d
numprocs=8                    ; 图片处理需要更多进程
autostart=true
autorestart=true

🚀 性能优化策略

  • 1. 连接池优化 – 使用Redis连接池减少连接开销
  • 2. 批量处理 – 支持批量拉取任务,减少网络往返
  • 3. 内存管理 – 定期重启Worker进程防止内存泄漏
  • 4. 优先级队列 – 实现多级优先级满足不同业务需求
  • 5. 监控告警 – 集成Prometheus + Grafana实现可视化监控

🎯 实战总结

通过本教程,我们构建了一个完整的PHP异步任务队列系统。这个系统具备生产环境所需的核心功能:
任务序列化、延迟执行、失败重试、多队列支持、进程管理和监控。

扩展挑战:

  1. 添加任务进度追踪功能
  2. 实现分布式Worker集群
  3. 集成Web管理界面
  4. 支持任务依赖关系

PHP异步任务队列系统实战:基于Redis实现高性能后台任务处理 | PHP高级编程教程
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 php PHP异步任务队列系统实战:基于Redis实现高性能后台任务处理 | PHP高级编程教程 https://www.taomawang.com/server/php/1568.html

常见问题

相关文章

猜你喜欢
发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务