免费资源下载
作者:PHP架构师 | 发布日期:2023年10月
一、异步任务系统概述
在现代Web应用中,异步任务处理已成为提升系统性能的关键技术。传统的同步处理方式在面对耗时操作时(如邮件发送、图片处理、数据报表生成等)会导致请求阻塞,严重影响用户体验。本文将介绍一种基于Redis的PHP异步延迟任务系统,能够实现任务的延迟执行、失败重试和优先级管理。
1.1 为什么选择Redis作为任务队列
- 高性能:Redis内存操作,读写速度极快
- 丰富的数据结构:有序集合(zset)天然支持延迟队列
- 持久化支持:RDB/AOF保证任务不丢失
- 原子操作:Lua脚本保证操作一致性
二、系统架构设计
+----------------+ +----------------+ +----------------+
| 任务生产者 |---->| Redis队列 |---->| 任务消费者 |
| (Producer) | | (Redis) | | (Consumer) |
+----------------+ +----------------+ +----------------+
| | |
+----------------+ +----------------+ +----------------+
| Web应用层 | | 延迟有序集合 | | Worker进程 |
| | | (ZSET) | | |
+----------------+ +----------------+ +----------------+
2.1 核心组件说明
任务生产者:负责创建任务,将任务序列化后存入Redis
Redis存储结构:
- 延迟队列:使用ZSET存储,score为执行时间戳
- 就绪队列:使用LIST存储,等待执行的任务
- 失败队列:使用LIST存储,执行失败的任务
任务消费者:守护进程,从队列中获取并执行任务
三、核心代码实现
3.1 任务类定义
<?php
/**
* 异步任务基类
*/
abstract class AsyncTask
{
protected $id;
protected $data;
protected $executeAt;
protected $maxRetries = 3;
protected $retryCount = 0;
protected $priority = 1; // 1-10,数值越大优先级越高
public function __construct(array $data, int $delaySeconds = 0)
{
$this->id = uniqid('task_', true);
$this->data = $data;
$this->executeAt = time() + $delaySeconds;
}
abstract public function execute(): bool;
public function toArray(): array
{
return [
'id' => $this->id,
'class' => get_class($this),
'data' => $this->data,
'execute_at' => $this->executeAt,
'priority' => $this->priority,
'max_retries' => $this->maxRetries,
'retry_count' => $this->retryCount
];
}
public static function fromArray(array $data): self
{
$task = new static($data['data']);
$task->id = $data['id'];
$task->executeAt = $data['execute_at'];
$task->priority = $data['priority'];
$task->maxRetries = $data['max_retries'];
$task->retryCount = $data['retry_count'];
return $task;
}
}
/**
* 邮件发送任务示例
*/
class EmailTask extends AsyncTask
{
public function execute(): bool
{
$data = $this->data;
// 模拟邮件发送逻辑
$success = $this->sendEmail(
$data['to'],
$data['subject'],
$data['body']
);
if (!$success && $this->retryCount maxRetries) {
$this->retryCount++;
return false; // 触发重试
}
return $success;
}
private function sendEmail($to, $subject, $body): bool
{
// 实际邮件发送逻辑
// 这里简化为随机成功/失败
return rand(0, 1) === 1;
}
}
?>
3.2 队列管理器实现
<?php
/**
* Redis队列管理器
*/
class RedisQueueManager
{
private $redis;
private $queueName;
// 队列键名定义
const KEY_DELAYED_QUEUE = 'async_tasks:delayed';
const KEY_READY_QUEUE = 'async_tasks:ready';
const KEY_FAILED_QUEUE = 'async_tasks:failed';
const KEY_PROCESSING_QUEUE = 'async_tasks:processing';
public function __construct(Redis $redis, string $queueName = 'default')
{
$this->redis = $redis;
$this->queueName = $queueName;
}
/**
* 发布延迟任务
*/
public function publishDelayedTask(AsyncTask $task): bool
{
$taskData = json_encode($task->toArray());
$score = $task->executeAt;
return $this->redis->zAdd(
self::KEY_DELAYED_QUEUE . ':' . $this->queueName,
$score,
$taskData
) > 0;
}
/**
* 将到期的延迟任务转移到就绪队列
*/
public function moveExpiredTasks(): int
{
$now = time();
$delayedKey = self::KEY_DELAYED_QUEUE . ':' . $this->queueName;
$readyKey = self::KEY_READY_QUEUE . ':' . $this->queueName;
// 使用Lua脚本保证原子性
$lua = <<<LUA
local delayed_key = KEYS[1]
local ready_key = KEYS[2]
local max_time = ARGV[1]
local limit = ARGV[2]
local tasks = redis.call('zRangeByScore', delayed_key, 0, max_time, 'LIMIT', 0, limit)
if #tasks > 0 then
redis.call('zRemRangeByScore', delayed_key, 0, max_time)
for i, task in ipairs(tasks) do
redis.call('rPush', ready_key, task)
end
end
return #tasks
LUA;
$result = $this->redis->eval(
$lua,
[$delayedKey, $readyKey, $now, 100],
2
);
return $result ?: 0;
}
/**
* 消费任务
*/
public function consume(int $timeout = 5): ?AsyncTask
{
$readyKey = self::KEY_READY_QUEUE . ':' . $this->queueName;
$processingKey = self::KEY_PROCESSING_QUEUE . ':' . $this->queueName;
// BRPOPLPUSH 原子操作,防止任务丢失
$taskData = $this->redis->brpoplpush(
$readyKey,
$processingKey,
$timeout
);
if (!$taskData) {
return null;
}
$taskArray = json_decode($taskData, true);
if (json_last_error() !== JSON_ERROR_NONE) {
$this->moveToFailedQueue($taskData, 'JSON解析失败');
return null;
}
if (!class_exists($taskArray['class'])) {
$this->moveToFailedQueue($taskData, '任务类不存在');
return null;
}
return call_user_func([$taskArray['class'], 'fromArray'], $taskArray);
}
/**
* 确认任务完成
*/
public function ack(string $taskId): bool
{
$processingKey = self::KEY_PROCESSING_QUEUE . ':' . $this->queueName;
// 从处理队列中移除
return $this->redis->lRem($processingKey, json_encode(['id' => $taskId]), 1) > 0;
}
/**
* 任务失败处理
*/
public function handleFailure(AsyncTask $task, string $error): void
{
$taskData = json_encode($task->toArray());
$failedKey = self::KEY_FAILED_QUEUE . ':' . $this->queueName;
$failedItem = [
'task' => $taskData,
'error' => $error,
'failed_at' => time(),
'retry_count' => $task->retryCount
];
$this->redis->rPush($failedKey, json_encode($failedItem));
// 如果还有重试机会,重新加入延迟队列
if ($task->retryCount maxRetries) {
$task->executeAt = time() + pow(2, $task->retryCount) * 60; // 指数退避
$this->publishDelayedTask($task);
}
}
private function moveToFailedQueue(string $taskData, string $error): void
{
$failedKey = self::KEY_FAILED_QUEUE . ':' . $this->queueName;
$failedItem = [
'task' => $taskData,
'error' => $error,
'failed_at' => time()
];
$this->redis->rPush($failedKey, json_encode($failedItem));
}
}
?>
3.3 Worker守护进程
<?php
/**
* Worker守护进程
*/
class TaskWorker
{
private $queueManager;
private $isRunning = false;
private $maxJobs = 1000;
private $processedJobs = 0;
public function __construct(RedisQueueManager $queueManager)
{
$this->queueManager = $queueManager;
pcntl_async_signals(true);
// 注册信号处理器
pcntl_signal(SIGTERM, [$this, 'handleSignal']);
pcntl_signal(SIGINT, [$this, 'handleSignal']);
}
public function start(): void
{
$this->isRunning = true;
$this->log('Worker启动');
while ($this->isRunning && $this->processedJobs maxJobs) {
try {
// 1. 转移到期任务
$moved = $this->queueManager->moveExpiredTasks();
if ($moved > 0) {
$this->log("转移了 {$moved} 个到期任务到就绪队列");
}
// 2. 消费任务
$task = $this->queueManager->consume(1);
if ($task) {
$this->processTask($task);
$this->processedJobs++;
}
// 3. 防止CPU占用过高
usleep(100000); // 100ms
} catch (Exception $e) {
$this->log("处理异常: " . $e->getMessage());
sleep(5); // 异常后等待5秒
}
}
$this->log('Worker停止');
}
private function processTask(AsyncTask $task): void
{
$this->log("开始处理任务: {$task->id}");
try {
$startTime = microtime(true);
$success = $task->execute();
$elapsed = round((microtime(true) - $startTime) * 1000, 2);
if ($success) {
$this->queueManager->ack($task->id);
$this->log("任务 {$task->id} 执行成功,耗时 {$elapsed}ms");
} else {
$this->queueManager->handleFailure($task, '任务执行失败');
$this->log("任务 {$task->id} 执行失败,已加入重试队列");
}
} catch (Throwable $e) {
$this->queueManager->handleFailure($task, $e->getMessage());
$this->log("任务 {$task->id} 执行异常: " . $e->getMessage());
}
}
public function handleSignal(int $signal): void
{
$this->log("收到信号 {$signal},准备停止...");
$this->isRunning = false;
}
private function log(string $message): void
{
$timestamp = date('Y-m-d H:i:s');
echo "[{$timestamp}] {$message}n";
}
}
// 使用示例
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$queueManager = new RedisQueueManager($redis, 'email_queue');
$worker = new TaskWorker($queueManager);
// 生产任务示例
$emailTask = new EmailTask([
'to' => 'user@example.com',
'subject' => '欢迎邮件',
'body' => '感谢注册我们的服务!'
], 60); // 60秒后执行
$queueManager->publishDelayedTask($emailTask);
// 启动worker(在实际环境中应作为守护进程运行)
// $worker->start();
?>
四、高级特性扩展
4.1 优先级队列实现
<?php
/**
* 支持优先级的队列管理器
*/
class PriorityQueueManager extends RedisQueueManager
{
public function publishDelayedTask(AsyncTask $task): bool
{
$taskData = json_encode($task->toArray());
$score = $task->executeAt;
// 将优先级信息编码到score中
// 格式:执行时间戳.优先级(补零到2位)
$priorityScore = sprintf('%d.%02d', $score, 10 - $task->priority);
return $this->redis->zAdd(
self::KEY_DELAYED_QUEUE . ':' . $this->queueName,
$priorityScore,
$taskData
) > 0;
}
}
?>
4.2 任务结果回调机制
<?php
/**
* 支持回调的任务类
*/
trait TaskCallbackTrait
{
private $successCallback;
private $failureCallback;
public function onSuccess(callable $callback): self
{
$this->successCallback = $callback;
return $this;
}
public function onFailure(callable $callback): self
{
$this->failureCallback = $callback;
return $this;
}
protected function triggerSuccess($result): void
{
if ($this->successCallback) {
call_user_func($this->successCallback, $result, $this);
}
}
protected function triggerFailure(string $error): void
{
if ($this->failureCallback) {
call_user_func($this->failureCallback, $error, $this);
}
}
}
?>
五、监控与维护
5.1 队列状态监控
<?php
/**
* 队列监控器
*/
class QueueMonitor
{
private $redis;
public function getQueueStats(string $queueName): array
{
$prefixes = [
'delayed' => RedisQueueManager::KEY_DELAYED_QUEUE,
'ready' => RedisQueueManager::KEY_READY_QUEUE,
'failed' => RedisQueueManager::KEY_FAILED_QUEUE,
'processing' => RedisQueueManager::KEY_PROCESSING_QUEUE
];
$stats = [];
foreach ($prefixes as $key => $prefix) {
$fullKey = $prefix . ':' . $queueName;
if ($key === 'delayed') {
$stats[$key] = $this->redis->zCard($fullKey);
} else {
$stats[$key] = $this->redis->lLen($fullKey);
}
}
return [
'queue_name' => $queueName,
'stats' => $stats,
'timestamp' => time(),
'memory_usage' => $this->redis->info('memory')['used_memory'] ?? 0
];
}
public function getFailedTasks(string $queueName, int $limit = 100): array
{
$failedKey = RedisQueueManager::KEY_FAILED_QUEUE . ':' . $queueName;
$tasks = $this->redis->lRange($failedKey, 0, $limit - 1);
return array_map(function($task) {
$data = json_decode($task, true);
return [
'failed_at' => date('Y-m-d H:i:s', $data['failed_at']),
'error' => $data['error'],
'retry_count' => $data['retry_count'] ?? 0,
'task_data' => json_decode($data['task'], true)
];
}, $tasks);
}
}
?>
5.2 Supervisor配置
; /etc/supervisor/conf.d/async-worker.conf
[program:async-worker]
command=php /path/to/worker.php
process_name=%(program_name)s_%(process_num)02d
numprocs=4
directory=/path/to
autostart=true
autorestart=true
startretries=3
user=www-data
redirect_stderr=true
stdout_logfile=/var/log/async-worker.log
stdout_logfile_maxbytes=50MB
stdout_logfile_backups=10
5.3 性能优化建议
- 连接池:使用Redis连接池减少连接开销
- 管道技术:批量操作使用pipeline减少网络往返
- Lua脚本:复杂操作用Lua保证原子性
- 内存优化:定期清理已完成任务数据
- 监控告警:设置队列积压告警阈值

