免费资源下载
基于Redis构建企业级高性能后台任务处理系统
为什么需要异步任务队列?
在现代Web应用中,许多操作如邮件发送、图片处理、数据报表生成等耗时任务会阻塞用户请求,导致响应延迟。异步任务队列通过将耗时任务放入后台处理,实现请求的快速响应和系统解耦。
本教程将构建一个完整的PHP异步任务队列系统,包含任务生产者、消费者、监控和失败重试机制。
一、系统架构设计
📤 任务生产者
Web应用将任务序列化后推送到Redis队列,立即返回响应给用户。
🔄 Redis队列
使用Redis List结构存储待处理任务,Sorted Set存储延迟任务。
⚙️ Worker消费者
常驻PHP进程从队列获取任务并执行,支持多进程并发处理。
📊 监控系统
实时监控队列长度、处理速度、失败任务等关键指标。
二、核心代码实现
1. 任务基类定义 (Task.php)
<?php
abstract class Task
{
protected $id;
protected $data;
protected $attempts = 0;
protected $maxAttempts = 3;
protected $delay = 0;
protected $createdAt;
public function __construct(array $data = [])
{
$this->id = uniqid('task_', true);
$this->data = $data;
$this->createdAt = time();
}
// 抽象方法,子类必须实现
abstract public function handle(): bool;
// 任务失败时的处理
public function failed(Exception $e): void
{
error_log("Task {$this->id} failed: " . $e->getMessage());
}
// 序列化任务
public function serialize(): string
{
return json_encode([
'id' => $this->id,
'class' => get_class($this),
'data' => $this->data,
'attempts' => $this->attempts,
'max_attempts' => $this->maxAttempts,
'delay' => $this->delay,
'created_at' => $this->createdAt
], JSON_UNESCAPED_UNICODE);
}
// 反序列化任务
public static function unserialize(string $data): self
{
$decoded = json_decode($data, true);
$task = new $decoded['class']($decoded['data']);
$task->id = $decoded['id'];
$task->attempts = $decoded['attempts'];
$task->maxAttempts = $decoded['max_attempts'];
$task->delay = $decoded['delay'];
$task->createdAt = $decoded['created_at'];
return $task;
}
// Getter方法
public function getId(): string { return $this->id; }
public function getAttempts(): int { return $this->attempts; }
public function incrementAttempts(): void { $this->attempts++; }
public function canRetry(): bool { return $this->attempts maxAttempts; }
}
2. 队列管理器 (QueueManager.php)
<?php
class QueueManager
{
private $redis;
private $queueName = 'default_queue';
private $delayedQueueName = 'delayed_queue';
private $failedQueueName = 'failed_queue';
public function __construct(string $host = '127.0.0.1', int $port = 6379)
{
$this->redis = new Redis();
$this->redis->connect($host, $port);
$this->redis->setOption(Redis::OPT_SERIALIZER, Redis::SERIALIZER_NONE);
}
// 推送任务到队列
public function push(Task $task, string $queue = null): bool
{
$queue = $queue ?? $this->queueName;
if ($task->delay > 0) {
// 延迟任务使用Sorted Set存储
$score = time() + $task->delay;
return $this->redis->zAdd(
$this->delayedQueueName . ':' . $queue,
$score,
$task->serialize()
) > 0;
}
// 立即执行的任务使用List存储
return $this->redis->lPush($queue, $task->serialize()) > 0;
}
// 从队列获取任务
public function pop(string $queue = null): ?Task
{
$queue = $queue ?? $this->queueName;
// 检查延迟队列是否有到期的任务
$this->moveDelayedTasks($queue);
// 阻塞式获取任务,超时时间5秒
$result = $this->redis->brPop([$queue], 5);
if ($result === false) {
return null;
}
try {
return Task::unserialize($result[1]);
} catch (Exception $e) {
error_log("Failed to unserialize task: " . $e->getMessage());
return null;
}
}
// 将延迟任务移动到就绪队列
private function moveDelayedTasks(string $queue): void
{
$now = time();
$delayedKey = $this->delayedQueueName . ':' . $queue;
// 获取所有到期的任务
$tasks = $this->redis->zRangeByScore($delayedKey, 0, $now);
if (!empty($tasks)) {
// 使用管道批量操作
$pipe = $this->redis->multi(Redis::PIPELINE);
foreach ($tasks as $task) {
$pipe->lPush($queue, $task);
$pipe->zRem($delayedKey, $task);
}
$pipe->exec();
}
}
// 记录失败任务
public function markAsFailed(Task $task, string $error): void
{
$failedData = [
'task' => $task->serialize(),
'error' => $error,
'failed_at' => date('Y-m-d H:i:s'),
'attempts' => $task->getAttempts()
];
$this->redis->lPush(
$this->failedQueueName . ':' . ($queue ?? $this->queueName),
json_encode($failedData, JSON_UNESCAPED_UNICODE)
);
}
// 获取队列统计信息
public function getStats(string $queue = null): array
{
$queue = $queue ?? $this->queueName;
return [
'pending' => $this->redis->lLen($queue),
'delayed' => $this->redis->zCard($this->delayedQueueName . ':' . $queue),
'failed' => $this->redis->lLen($this->failedQueueName . ':' . $queue)
];
}
}
3. 具体任务示例:邮件发送任务 (SendEmailTask.php)
<?php
class SendEmailTask extends Task
{
private $to;
private $subject;
private $body;
private $headers;
public function __construct(array $data)
{
parent::__construct($data);
$this->to = $data['to'] ?? '';
$this->subject = $data['subject'] ?? '';
$this->body = $data['body'] ?? '';
$this->headers = $data['headers'] ?? [];
// 邮件任务最多重试2次
$this->maxAttempts = 2;
}
public function handle(): bool
{
// 模拟邮件发送过程
echo "Sending email to: {$this->to}n";
echo "Subject: {$this->subject}n";
// 模拟网络延迟
sleep(rand(1, 3));
// 模拟90%的成功率
if (rand(1, 10) %s",
date('Y-m-d H:i:s'),
$this->to,
$e->getMessage()
);
file_put_contents('email_failures.log', $logMessage . PHP_EOL, FILE_APPEND);
}
}
三、Worker进程实现
Worker守护进程 (worker.php)
<?php
require_once 'Task.php';
require_once 'QueueManager.php';
require_once 'SendEmailTask.php';
class Worker
{
private $queueManager;
private $shouldStop = false;
private $processedCount = 0;
private $startTime;
public function __construct()
{
$this->queueManager = new QueueManager();
$this->startTime = time();
// 注册信号处理器,实现优雅退出
pcntl_async_signals(true);
pcntl_signal(SIGTERM, [$this, 'stop']);
pcntl_signal(SIGINT, [$this, 'stop']);
}
public function start(string $queue = null): void
{
echo "Worker started at " . date('Y-m-d H:i:s') . "n";
echo "Listening on queue: " . ($queue ?? 'default') . "n";
while (!$this->shouldStop) {
$this->processNextTask($queue);
// 每处理100个任务输出一次状态
if ($this->processedCount % 100 === 0 && $this->processedCount > 0) {
$this->printStats($queue);
}
}
echo "Worker stopped gracefully.n";
}
private function processNextTask(string $queue = null): void
{
$task = $this->queueManager->pop($queue);
if ($task === null) {
// 队列为空,短暂休眠避免CPU空转
usleep(100000); // 100ms
return;
}
try {
echo "Processing task: {$task->getId()}n";
$task->incrementAttempts();
$success = $task->handle();
if ($success) {
echo "Task {$task->getId()} completed successfully.n";
$this->processedCount++;
} else {
throw new Exception("Task handle() returned false");
}
} catch (Exception $e) {
echo "Task {$task->getId()} failed: " . $e->getMessage() . "n";
if ($task->canRetry()) {
// 重试任务,增加延迟时间(指数退避)
$retryDelay = pow(2, $task->getAttempts() - 1) * 60; // 1m, 2m, 4m...
$task->delay = $retryDelay;
$this->queueManager->push($task, $queue);
echo "Task scheduled for retry in {$retryDelay} seconds.n";
} else {
// 记录到失败队列
$this->queueManager->markAsFailed($task, $e->getMessage());
$task->failed($e);
echo "Task moved to failed queue after {$task->getAttempts()} attempts.n";
}
}
}
private function printStats(string $queue = null): void
{
$stats = $this->queueManager->getStats($queue);
$uptime = time() - $this->startTime;
echo "n=== Worker Stats ===n";
echo "Uptime: " . gmdate("H:i:s", $uptime) . "n";
echo "Processed: {$this->processedCount}n";
echo "Pending: {$stats['pending']}n";
echo "Delayed: {$stats['delayed']}n";
echo "Failed: {$stats['failed']}n";
echo "===================nn";
}
public function stop(): void
{
echo "nReceived stop signal, finishing current task...n";
$this->shouldStop = true;
}
}
// 启动Worker
if (php_sapi_name() === 'cli') {
$worker = new Worker();
$queue = $argv[1] ?? null;
$worker->start($queue);
} else {
die("This script must be run from command line.n");
}
四、Web应用集成示例
在Laravel控制器中使用任务队列
<?php
namespace AppHttpControllers;
use AppJobsSendWelcomeEmail;
use AppJobsProcessUserUpload;
use AppJobsGenerateDataReport;
use IlluminateHttpRequest;
class UserController extends Controller
{
public function register(Request $request)
{
// 验证用户输入
$validated = $request->validate([
'name' => 'required|string|max:255',
'email' => 'required|email|unique:users',
'password' => 'required|min:8',
]);
// 同步操作:创建用户记录
$user = User::create($validated);
// 异步任务1:发送欢迎邮件(立即执行)
$emailTask = new SendEmailTask([
'to' => $user->email,
'subject' => '欢迎注册我们的服务',
'body' => $this->generateWelcomeEmail($user->name),
'template' => 'welcome'
]);
$queueManager = new QueueManager();
$queueManager->push($emailTask);
// 异步任务2:处理用户头像(延迟5分钟执行)
if ($request->hasFile('avatar')) {
$avatarTask = new ProcessImageTask([
'user_id' => $user->id,
'image_path' => $request->file('avatar')->store('temp'),
'sizes' => ['small', 'medium', 'large']
]);
$avatarTask->delay = 300; // 5分钟延迟
$queueManager->push($avatarTask, 'image_processing');
}
// 异步任务3:生成初始数据报告(低优先级队列)
$reportTask = new GenerateReportTask([
'user_id' => $user->id,
'report_type' => 'initial_analysis',
'time_range' => 'all'
]);
$queueManager->push($reportTask, 'low_priority');
// 立即返回响应,不等待任务完成
return response()->json([
'message' => '注册成功,欢迎邮件已发送',
'user_id' => $user->id,
'queued_tasks' => 3
], 201);
}
public function batchImport(Request $request)
{
// 批量导入用户,使用任务队列处理
$users = $request->input('users');
$importId = uniqid('import_');
foreach (array_chunk($users, 100) as $chunkIndex => $chunk) {
$importTask = new BatchImportTask([
'import_id' => $importId,
'chunk_index' => $chunkIndex,
'users' => $chunk,
'total_chunks' => ceil(count($users) / 100)
]);
// 使用不同的队列实现负载均衡
$queueName = 'import_' . ($chunkIndex % 4); // 4个导入队列
$queueManager->push($importTask, $queueName);
}
return response()->json([
'message' => '批量导入任务已提交',
'import_id' => $importId,
'total_users' => count($users),
'queued_chunks' => ceil(count($users) / 100)
]);
}
}
五、监控与管理脚本
队列监控脚本 (monitor.php)
<?php
// 实时监控队列状态
$queues = ['default', 'email', 'image_processing', 'low_priority'];
$manager = new QueueManager();
while (true) {
system('clear'); // 清屏
echo "=== 队列监控面板 ===n";
echo "时间: " . date('Y-m-d H:i:s') . "nn";
foreach ($queues as $queue) {
$stats = $manager->getStats($queue);
echo str_pad($queue, 20) . " | ";
echo "待处理: " . str_pad($stats['pending'], 6) . " | ";
echo "延迟: " . str_pad($stats['delayed'], 6) . " | ";
echo "失败: " . $stats['failed'] . "n";
}
// 绘制简单的ASCII图表
echo "n📊 队列负载图表:n";
foreach ($queues as $queue) {
$stats = $manager->getStats($queue);
$bars = min(50, $stats['pending']);
echo str_pad($queue, 20) . " | " . str_repeat('█', $bars) . "n";
}
sleep(2); // 2秒刷新一次
}
失败任务重试脚本 (retry_failed.php)
<?php
// 重试失败队列中的任务
$manager = new QueueManager();
$queues = ['default', 'email'];
foreach ($queues as $queue) {
$failedKey = 'failed_queue:' . $queue;
$failedCount = $manager->redis->lLen($failedKey);
echo "检查队列: {$queue}, 失败任务数: {$failedCount}n";
for ($i = 0; $i redis->lIndex($failedKey, $i),
true
);
if ($failedData && isset($failedData['task'])) {
$task = Task::unserialize($failedData['task']);
echo "重试任务: {$task->getId()}, ";
echo "原错误: {$failedData['error']}n";
// 询问是否重试
echo "是否重试此任务? (y/n): ";
$handle = fopen("php://stdin", "r");
$answer = trim(fgets($handle));
if (strtolower($answer) === 'y') {
$manager->push($task, $queue);
$manager->redis->lRem($failedKey, $failedData, 1);
echo "✅ 任务已重新加入队列n";
} else {
echo "⏭️ 跳过此任务n";
}
}
}
}
六、部署与优化建议
🔧 Supervisor配置
[program:php-worker-default] command=php /path/to/worker.php default process_name=%(program_name)s_%(process_num)02d numprocs=4 ; 启动4个进程 autostart=true autorestart=true stopwaitsecs=30 user=www-data stdout_logfile=/var/log/worker-default.log stderr_logfile=/var/log/worker-error.log [program:php-worker-email] command=php /path/to/worker.php email process_name=%(program_name)s_%(process_num)02d numprocs=2 autostart=true autorestart=true [program:php-worker-images] command=php /path/to/worker.php image_processing process_name=%(program_name)s_%(process_num)02d numprocs=8 ; 图片处理需要更多进程 autostart=true autorestart=true
🚀 性能优化策略
- 1. 连接池优化 – 使用Redis连接池减少连接开销
- 2. 批量处理 – 支持批量拉取任务,减少网络往返
- 3. 内存管理 – 定期重启Worker进程防止内存泄漏
- 4. 优先级队列 – 实现多级优先级满足不同业务需求
- 5. 监控告警 – 集成Prometheus + Grafana实现可视化监控
🎯 实战总结
通过本教程,我们构建了一个完整的PHP异步任务队列系统。这个系统具备生产环境所需的核心功能:
任务序列化、延迟执行、失败重试、多队列支持、进程管理和监控。
扩展挑战:
- 添加任务进度追踪功能
- 实现分布式Worker集群
- 集成Web管理界面
- 支持任务依赖关系

