免费资源下载
一、异步任务队列的必要性
在现代Web应用中,用户请求往往需要处理耗时操作,如发送邮件、生成报表、处理图片等。如果这些操作同步执行,会导致用户等待时间过长,影响用户体验。异步任务队列通过将耗时任务放入队列,由后台进程异步处理,实现了请求的快速响应。
本文将详细讲解如何使用PHP和Redis构建一个完整的异步任务队列系统,包含任务发布、消费、重试、失败处理等完整功能。
二、系统架构设计
我们的异步队列系统包含以下核心组件:
- 任务生产者(Producer):负责创建和发布任务到Redis队列
- Redis存储:使用List数据结构存储待处理任务,Sorted Set处理延迟任务
- 任务消费者(Consumer):守护进程,持续从队列获取并执行任务
- 监控系统:实时监控队列状态和任务执行情况
+-------------+ +-------------+ +-------------+
| Web应用 |---->| Redis |---->| Worker |
| (生产者) | | (队列) | | (消费者) |
+-------------+ +-------------+ +-------------+
三、完整代码实现
3.1 Redis队列管理器
<?php
/**
* Redis任务队列管理器
*/
class RedisQueueManager
{
private $redis;
private $queueName = 'async_tasks';
private $delayQueueName = 'delay_tasks';
public function __construct($config = [])
{
$this->redis = new Redis();
$this->redis->connect(
$config['host'] ?? '127.0.0.1',
$config['port'] ?? 6379
);
if (!empty($config['password'])) {
$this->redis->auth($config['password']);
}
if (!empty($config['database'])) {
$this->redis->select($config['database']);
}
}
/**
* 发布即时任务
*/
public function publish($taskType, $data, $priority = 'normal')
{
$task = [
'id' => uniqid('task_', true),
'type' => $taskType,
'data' => $data,
'created_at' => time(),
'priority' => $priority,
'attempts' => 0,
'max_attempts' => 3
];
$jsonTask = json_encode($task, JSON_UNESCAPED_UNICODE);
// 根据优先级选择队列
if ($priority === 'high') {
return $this->redis->lPush($this->queueName . ':high', $jsonTask);
} else {
return $this->redis->lPush($this->queueName, $jsonTask);
}
}
/**
* 发布延迟任务
*/
public function publishDelay($taskType, $data, $delaySeconds)
{
$task = [
'id' => uniqid('delay_', true),
'type' => $taskType,
'data' => $data,
'created_at' => time(),
'execute_at' => time() + $delaySeconds
];
$jsonTask = json_encode($task, JSON_UNESCAPED_UNICODE);
return $this->redis->zAdd(
$this->delayQueueName,
time() + $delaySeconds,
$jsonTask
);
}
/**
* 消费任务
*/
public function consume($timeout = 5)
{
// 先检查延迟队列中是否有到期的任务
$this->moveDelayedTasks();
// 优先处理高优先级队列
$task = $this->redis->rPop($this->queueName . ':high');
if (!$task) {
$task = $this->redis->brPop([$this->queueName], $timeout);
if ($task) {
$task = $task[1]; // brPop返回数组
}
}
return $task ? json_decode($task, true) : null;
}
/**
* 将到期的延迟任务移到主队列
*/
private function moveDelayedTasks()
{
$now = time();
$tasks = $this->redis->zRangeByScore(
$this->delayQueueName,
0,
$now
);
if (!empty($tasks)) {
foreach ($tasks as $task) {
$this->redis->lPush($this->queueName, $task);
$this->redis->zRem($this->delayQueueName, $task);
}
}
}
/**
* 获取队列状态
*/
public function getQueueStats()
{
return [
'pending_normal' => $this->redis->lLen($this->queueName),
'pending_high' => $this->redis->lLen($this->queueName . ':high'),
'pending_delayed' => $this->redis->zCard($this->delayQueueName)
];
}
}
3.2 任务处理器基类
<?php
/**
* 任务处理器抽象基类
*/
abstract class TaskHandler
{
protected $maxRetries = 3;
protected $retryDelay = 60; // 重试延迟秒数
abstract public function handle($taskData);
public function failed($taskData, $exception)
{
// 记录失败日志
error_log(sprintf(
"任务处理失败: %s, 错误: %s",
json_encode($taskData),
$exception->getMessage()
));
// 可以发送告警通知
$this->sendAlert($taskData, $exception);
}
protected function sendAlert($taskData, $exception)
{
// 实现告警逻辑,如发送邮件、Slack通知等
}
}
/**
* 邮件发送任务处理器
*/
class EmailTaskHandler extends TaskHandler
{
public function handle($taskData)
{
$to = $taskData['to'];
$subject = $taskData['subject'];
$body = $taskData['body'];
// 模拟邮件发送
$success = $this->sendEmail($to, $subject, $body);
if (!$success) {
throw new Exception('邮件发送失败');
}
return true;
}
private function sendEmail($to, $subject, $body)
{
// 实际邮件发送逻辑
// 这里使用模拟实现
sleep(1); // 模拟耗时操作
return true;
}
}
/**
* 图片处理任务处理器
*/
class ImageProcessHandler extends TaskHandler
{
public function handle($taskData)
{
$imagePath = $taskData['path'];
$operations = $taskData['operations'];
foreach ($operations as $operation) {
switch ($operation['type']) {
case 'resize':
$this->resizeImage($imagePath, $operation['width'], $operation['height']);
break;
case 'watermark':
$this->addWatermark($imagePath, $operation['text']);
break;
case 'compress':
$this->compressImage($imagePath, $operation['quality']);
break;
}
}
return true;
}
private function resizeImage($path, $width, $height)
{
// 图片缩放实现
sleep(2); // 模拟耗时操作
}
private function addWatermark($path, $text)
{
// 添加水印实现
sleep(1);
}
private function compressImage($path, $quality)
{
// 图片压缩实现
sleep(1);
}
}
3.3 消费者守护进程
<?php
/**
* 队列消费者守护进程
*/
class QueueConsumerDaemon
{
private $queueManager;
private $handlers = [];
private $running = false;
public function __construct(RedisQueueManager $queueManager)
{
$this->queueManager = $queueManager;
}
public function registerHandler($taskType, TaskHandler $handler)
{
$this->handlers[$taskType] = $handler;
}
public function start($maxTasks = null)
{
$this->running = true;
$processed = 0;
pcntl_signal(SIGTERM, [$this, 'shutdown']);
pcntl_signal(SIGINT, [$this, 'shutdown']);
echo "消费者守护进程启动...n";
while ($this->running) {
// 检查信号
pcntl_signal_dispatch();
$task = $this->queueManager->consume();
if ($task) {
$this->processTask($task);
$processed++;
// 如果设置了最大处理任务数
if ($maxTasks && $processed >= $maxTasks) {
$this->shutdown();
}
}
// 防止CPU空转
if (!$task) {
sleep(1);
}
}
echo "消费者守护进程停止n";
}
private function processTask($task)
{
$taskType = $task['type'];
if (!isset($this->handlers[$taskType])) {
error_log("未注册的任务类型: {$taskType}");
return;
}
$handler = $this->handlers[$taskType];
try {
echo "处理任务: {$task['id']} [{$taskType}]n";
$handler->handle($task['data']);
echo "任务完成: {$task['id']}n";
} catch (Exception $e) {
$task['attempts']++;
if ($task['attempts'] retryDelay;
$this->queueManager->publishDelay(
$taskType,
$task['data'],
$handler->retryDelay
);
echo "任务重试: {$task['id']} (尝试 {$task['attempts']})n";
} else {
// 最终失败
$handler->failed($task['data'], $e);
echo "任务失败: {$task['id']}n";
}
}
}
public function shutdown()
{
$this->running = false;
}
}
四、实战应用示例
4.1 生产者示例(Web应用中使用)
<?php
// 在控制器或服务中使用
class UserController
{
public function registerAction($userData)
{
// 保存用户到数据库
$userId = $this->saveUser($userData);
// 初始化队列管理器
$queue = new RedisQueueManager([
'host' => 'localhost',
'port' => 6379,
'database' => 0
]);
// 发送欢迎邮件(异步)
$queue->publish('email', [
'to' => $userData['email'],
'subject' => '欢迎注册',
'body' => '感谢您注册我们的服务!'
]);
// 延迟任务:注册24小时后发送提醒邮件
$queue->publishDelay('email', [
'to' => $userData['email'],
'subject' => '快来体验更多功能',
'body' => '您已经注册24小时了,快来体验我们的高级功能吧!'
], 86400); // 24小时后
// 处理用户头像(异步)
if (!empty($_FILES['avatar'])) {
$queue->publish('image_process', [
'path' => $_FILES['avatar']['tmp_name'],
'operations' => [
['type' => 'resize', 'width' => 200, 'height' => 200],
['type' => 'compress', 'quality' => 80]
]
], 'high'); // 高优先级
}
// 立即返回响应,无需等待任务完成
return [
'success' => true,
'message' => '注册成功,欢迎邮件已发送',
'user_id' => $userId
];
}
}
4.2 启动消费者守护进程
<?php
// consumer.php
require_once 'RedisQueueManager.php';
require_once 'TaskHandler.php';
require_once 'QueueConsumerDaemon.php';
// 初始化队列管理器
$queueManager = new RedisQueueManager([
'host' => 'localhost',
'port' => 6379
]);
// 初始化消费者
$consumer = new QueueConsumerDaemon($queueManager);
// 注册任务处理器
$consumer->registerHandler('email', new EmailTaskHandler());
$consumer->registerHandler('image_process', new ImageProcessHandler());
// 启动消费者(无限运行)
$consumer->start();
// 或者指定最大处理任务数(适合测试)
// $consumer->start(100); // 处理100个任务后停止
4.3 监控队列状态
<?php
// monitor.php
$queueManager = new RedisQueueManager();
// 获取队列统计信息
$stats = $queueManager->getQueueStats();
echo "队列状态监控:n";
echo "普通待处理任务: {$stats['pending_normal']}n";
echo "高优先级任务: {$stats['pending_high']}n";
echo "延迟任务: {$stats['pending_delayed']}n";
// 可以集成到管理后台或监控系统
if ($stats['pending_normal'] > 1000) {
// 发送告警
mail('admin@example.com', '队列积压告警', '普通任务队列积压超过1000');
}
五、高级特性与优化
5.1 多进程消费
为了提高消费能力,可以启动多个消费者进程:
<?php
// multi_consumer.php
$maxWorkers = 4; // 根据CPU核心数调整
for ($i = 0; $i registerHandler('email', new EmailTaskHandler());
$consumer->start();
exit(0);
}
}
// 等待所有子进程
while (pcntl_waitpid(0, $status) != -1) {
$status = pcntl_wexitstatus($status);
echo "子进程退出,状态: {$status}n";
}
5.2 任务优先级与延迟队列
系统支持三种队列类型:
- 高优先级队列:立即处理的重要任务
- 普通队列:常规异步任务
- 延迟队列:定时执行的任务
5.3 失败重试机制
每个任务支持配置最大重试次数,失败的任务会延迟重试,避免立即重试造成的资源浪费。
六、总结与最佳实践
6.1 性能优化建议
- 连接池:在生产环境中使用Redis连接池
- 序列化优化:对于大型数据,考虑使用MessagePack替代JSON
- 监控告警:实现完整的监控和告警系统
- 优雅停机:消费者应支持优雅停机,处理完当前任务后再退出
6.2 扩展方向
- 添加任务进度追踪功能
- 实现任务依赖关系
- 添加任务取消机制
- 集成到Laravel、Symfony等框架
6.3 生产环境部署
建议使用Supervisor管理消费者进程:
; /etc/supervisor/conf.d/queue-worker.conf
[program:queue-worker]
command=php /path/to/consumer.php
process_name=%(program_name)s_%(process_num)02d
numprocs=4
directory=/path/to
autostart=true
autorestart=true
user=www-data
redirect_stderr=true
stdout_logfile=/var/log/queue-worker.log

