免费资源下载
原创技术教程 | 最后更新:2023年10月
一、异步任务队列的核心价值
在现代Web应用中,用户请求的即时响应至关重要。然而,某些操作如邮件发送、图片处理、数据报表生成等耗时任务会阻塞请求流程。异步任务队列通过将耗时操作从主请求流程中解耦,显著提升系统响应速度和吞吐量。
传统同步处理的问题:
- 用户等待时间过长,体验差
- 并发处理能力受限
- 单点故障导致任务丢失
- 资源利用率不均衡
异步队列的优势:
- 请求响应时间从秒级降至毫秒级
- 支持任务重试和失败处理
- 实现削峰填谷,提高系统稳定性
- 便于横向扩展
二、系统架构设计
我们设计一个基于Redis的轻量级任务队列系统,包含以下核心组件:
2.1 架构图
+----------------+ +---------------+ +-------------------+
| Web应用 |---->| Redis队列 |---->| Worker进程 |
| (生产者) | | (消息中间件) | | (消费者) |
+----------------+ +---------------+ +-------------------+
| | |
| 提交任务 | 存储任务 | 执行任务
v v v
+----------------+ +---------------+ +-------------------+
| 任务状态查询 |<----| 任务状态存储 |<----| 任务结果回写 |
+----------------+ +---------------+ +-------------------+
2.2 队列设计
- 等待队列:存放待处理任务
- 处理中队列:正在执行的任务
- 完成队列:已成功完成的任务
- 失败队列:执行失败的任务
三、完整代码实现
3.1 环境要求
PHP >= 7.4
Redis >= 5.0
Composer (用于安装predis/predis)
3.2 安装依赖
composer require predis/predis
3.3 核心队列类实现
<?php
/**
* Redis异步任务队列核心类
*/
class AsyncTaskQueue
{
private $redis;
private $queuePrefix = 'async_queue:';
public function __construct($config = [])
{
$defaultConfig = [
'host' => '127.0.0.1',
'port' => 6379,
'database' => 0,
'password' => null,
'timeout' => 5.0
];
$config = array_merge($defaultConfig, $config);
$this->redis = new PredisClient([
'scheme' => 'tcp',
'host' => $config['host'],
'port' => $config['port'],
'database' => $config['database'],
'password' => $config['password'],
'timeout' => $config['timeout']
]);
}
/**
* 提交任务到队列
*/
public function push(string $queueName, array $task, int $delay = 0): string
{
$taskId = uniqid('task_', true);
$taskData = [
'id' => $taskId,
'data' => $task,
'created_at' => time(),
'attempts' => 0,
'status' => 'pending',
'queue' => $queueName
];
$taskKey = $this->queuePrefix . 'task:' . $taskId;
// 存储任务详情
$this->redis->hmset($taskKey, $taskData);
if ($delay > 0) {
// 延迟任务
$score = time() + $delay;
$this->redis->zadd($this->queuePrefix . 'delayed:' . $queueName, [$taskId => $score]);
} else {
// 立即执行任务
$this->redis->lpush($this->queuePrefix . 'waiting:' . $queueName, $taskId);
}
return $taskId;
}
/**
* 从队列获取任务
*/
public function pop(string $queueName, int $timeout = 5): ?array
{
// 检查延迟队列
$this->moveDelayedTasks($queueName);
// 从等待队列获取任务
$result = $this->redis->brpop(
$this->queuePrefix . 'waiting:' . $queueName,
$timeout
);
if (!$result) {
return null;
}
$taskId = $result[1];
// 获取任务详情
$taskData = $this->redis->hgetall($this->queuePrefix . 'task:' . $taskId);
if (empty($taskData)) {
return null;
}
// 更新任务状态
$this->redis->hmset($this->queuePrefix . 'task:' . $taskId, [
'status' => 'processing',
'started_at' => time(),
'attempts' => intval($taskData['attempts'] ?? 0) + 1
]);
// 添加到处理中队列
$this->redis->lpush($this->queuePrefix . 'processing:' . $queueName, $taskId);
return [
'id' => $taskId,
'data' => json_decode($taskData['data'] ?? '[]', true),
'queue' => $queueName
];
}
/**
* 完成任务
*/
public function complete(string $taskId, $result = null): bool
{
$taskKey = $this->queuePrefix . 'task:' . $taskId;
$taskData = $this->redis->hgetall($taskKey);
if (empty($taskData)) {
return false;
}
// 更新任务状态
$updateData = [
'status' => 'completed',
'completed_at' => time(),
'result' => json_encode($result)
];
$this->redis->hmset($taskKey, $updateData);
// 从处理中队列移除
$queueName = $taskData['queue'];
$this->redis->lrem($this->queuePrefix . 'processing:' . $queueName, 0, $taskId);
// 添加到完成队列
$this->redis->lpush($this->queuePrefix . 'completed:' . $queueName, $taskId);
// 设置过期时间(7天后自动删除)
$this->redis->expire($taskKey, 604800);
return true;
}
/**
* 任务失败处理
*/
public function fail(string $taskId, string $error, int $maxRetries = 3): bool
{
$taskKey = $this->queuePrefix . 'task:' . $taskId;
$taskData = $this->redis->hgetall($taskKey);
if (empty($taskData)) {
return false;
}
$attempts = intval($taskData['attempts'] ?? 0);
$queueName = $taskData['queue'];
if ($attempts >= $maxRetries) {
// 超过重试次数,标记为失败
$updateData = [
'status' => 'failed',
'failed_at' => time(),
'error' => $error,
'final_attempt' => $attempts
];
$this->redis->hmset($taskKey, $updateData);
// 添加到失败队列
$this->redis->lpush($this->queuePrefix . 'failed:' . $queueName, $taskId);
} else {
// 重新加入队列重试
$delay = min(300, pow(2, $attempts) * 60); // 指数退避
$this->redis->zadd(
$this->queuePrefix . 'delayed:' . $queueName,
[$taskId => time() + $delay]
);
$updateData = [
'status' => 'retrying',
'last_error' => $error,
'next_retry_at' => time() + $delay
];
$this->redis->hmset($taskKey, $updateData);
}
// 从处理中队列移除
$this->redis->lrem($this->queuePrefix . 'processing:' . $queueName, 0, $taskId);
return true;
}
/**
* 移动延迟任务到等待队列
*/
private function moveDelayedTasks(string $queueName): void
{
$delayedKey = $this->queuePrefix . 'delayed:' . $queueName;
$now = time();
// 获取所有到期的任务
$tasks = $this->redis->zrangebyscore($delayedKey, 0, $now);
if (!empty($tasks)) {
foreach ($tasks as $taskId) {
// 添加到等待队列
$this->redis->lpush($this->queuePrefix . 'waiting:' . $queueName, $taskId);
// 从延迟队列移除
$this->redis->zrem($delayedKey, $taskId);
}
}
}
/**
* 获取队列统计信息
*/
public function getStats(string $queueName): array
{
return [
'waiting' => $this->redis->llen($this->queuePrefix . 'waiting:' . $queueName),
'processing' => $this->redis->llen($this->queuePrefix . 'processing:' . $queueName),
'completed' => $this->redis->llen($this->queuePrefix . 'completed:' . $queueName),
'failed' => $this->redis->llen($this->queuePrefix . 'failed:' . $queueName),
'delayed' => $this->redis->zcard($this->queuePrefix . 'delayed:' . $queueName)
];
}
}
3.4 Worker进程实现
<?php
/**
* 任务Worker类
*/
class TaskWorker
{
private $queue;
private $handlers = [];
private $running = false;
public function __construct(AsyncTaskQueue $queue)
{
$this->queue = $queue;
}
/**
* 注册任务处理器
*/
public function registerHandler(string $taskType, callable $handler): void
{
$this->handlers[$taskType] = $handler;
}
/**
* 启动Worker
*/
public function start(string $queueName, array $options = []): void
{
$defaultOptions = [
'sleep' => 1, // 队列空时休眠时间
'max_memory' => 128 * 1024 * 1024, // 最大内存限制
'max_tasks' => 1000, // 最大处理任务数
'stop_file' => '/tmp/stop_worker' // 停止信号文件
];
$options = array_merge($defaultOptions, $options);
$this->running = true;
$processed = 0;
echo "Worker started on queue: {$queueName}n";
// 注册信号处理器
pcntl_signal(SIGTERM, [$this, 'stop']);
pcntl_signal(SIGINT, [$this, 'stop']);
while ($this->running) {
// 检查停止信号
if (file_exists($options['stop_file'])) {
echo "Stop signal detected, shutting down...n";
$this->running = false;
break;
}
// 检查内存限制
if (memory_get_usage(true) > $options['max_memory']) {
echo "Memory limit reached, restarting...n";
$this->running = false;
break;
}
// 处理任务
$task = $this->queue->pop($queueName);
if ($task === null) {
// 队列为空,休眠
sleep($options['sleep']);
continue;
}
try {
echo "Processing task: {$task['id']}n";
// 执行任务
$result = $this->executeTask($task);
// 标记任务完成
$this->queue->complete($task['id'], $result);
echo "Task completed: {$task['id']}n";
} catch (Exception $e) {
echo "Task failed: {$task['id']} - {$e->getMessage()}n";
// 标记任务失败
$this->queue->fail($task['id'], $e->getMessage());
}
$processed++;
// 检查最大任务数限制
if ($processed >= $options['max_tasks']) {
echo "Reached max tasks limit, restarting...n";
$this->running = false;
break;
}
// 处理信号
pcntl_signal_dispatch();
}
echo "Worker stopped. Processed {$processed} tasks.n";
}
/**
* 执行具体任务
*/
private function executeTask(array $task)
{
$taskData = $task['data'];
$taskType = $taskData['type'] ?? 'default';
if (!isset($this->handlers[$taskType])) {
throw new Exception("No handler registered for task type: {$taskType}");
}
return call_user_func($this->handlers[$taskType], $taskData);
}
/**
* 停止Worker
*/
public function stop(): void
{
$this->running = false;
}
}
3.5 实际应用示例
<?php
// 示例:用户注册后的异步任务处理
require_once 'AsyncTaskQueue.php';
require_once 'TaskWorker.php';
// 初始化队列
$queue = new AsyncTaskQueue([
'host' => 'localhost',
'port' => 6379
]);
// 用户注册处理函数
function handleUserRegistration($data)
{
$userId = $data['user_id'];
$email = $data['email'];
// 1. 发送欢迎邮件
sendWelcomeEmail($email, $userId);
// 2. 创建用户统计记录
createUserStatistics($userId);
// 3. 推荐系统初始化
initRecommendationSystem($userId);
// 4. 发送管理员通知
notifyAdministrator($userId);
return [
'user_id' => $userId,
'processed_at' => date('Y-m-d H:i:s'),
'steps_completed' => 4
];
}
// 邮件发送任务
function sendWelcomeEmail($email, $userId)
{
// 模拟邮件发送
sleep(2);
echo "Welcome email sent to: {$email}n";
// 实际应用中这里调用邮件发送服务
// mail($email, 'Welcome', 'Welcome to our service!');
}
// 其他处理函数...
function createUserStatistics($userId) { /* ... */ }
function initRecommendationSystem($userId) { /* ... */ }
function notifyAdministrator($userId) { /* ... */ }
// Web应用中的使用(生产者)
if ($_SERVER['REQUEST_METHOD'] === 'POST' && isset($_POST['register'])) {
// 用户注册逻辑
$userId = createUser($_POST);
// 提交异步任务
$taskId = $queue->push('user_tasks', [
'type' => 'user_registration',
'user_id' => $userId,
'email' => $_POST['email'],
'registration_time' => time()
]);
// 立即返回响应
echo json_encode([
'success' => true,
'message' => 'Registration successful',
'task_id' => $taskId,
'user_id' => $userId
]);
exit;
}
// Worker启动脚本(消费者)
if (php_sapi_name() === 'cli' && isset($argv[1]) && $argv[1] === 'worker') {
$worker = new TaskWorker($queue);
// 注册任务处理器
$worker->registerHandler('user_registration', 'handleUserRegistration');
// 启动Worker
$worker->start('user_tasks', [
'sleep' => 2,
'max_memory' => 256 * 1024 * 1024
]);
}
四、高级特性与优化
4.1 优先级队列实现
// 在AsyncTaskQueue类中添加
public function pushWithPriority(string $queueName, array $task, int $priority = 5): string
{
$taskId = uniqid('task_', true);
$taskData = [
'id' => $taskId,
'data' => $task,
'priority' => $priority,
'created_at' => time()
];
$taskKey = $this->queuePrefix . 'task:' . $taskId;
$this->redis->hmset($taskKey, $taskData);
// 使用有序集合实现优先级
$this->redis->zadd(
$this->queuePrefix . 'priority:' . $queueName,
[$taskId => $priority]
);
return $taskId;
}
public function popPriority(string $queueName): ?array
{
// 获取最高优先级的任务
$tasks = $this->redis->zrevrange(
$this->queuePrefix . 'priority:' . $queueName,
0,
0,
['WITHSCORES' => true]
);
if (empty($tasks)) {
return null;
}
$taskId = array_key_first($tasks);
$this->redis->zrem($this->queuePrefix . 'priority:' . $queueName, $taskId);
// 获取任务详情
return $this->getTask($taskId);
}
4.2 批量任务处理
public function processBatch(string $queueName, int $batchSize = 10): array
{
$tasks = [];
for ($i = 0; $i pop($queueName, 1);
if ($task) {
$tasks[] = $task;
} else {
break;
}
}
return $tasks;
}
// Worker中的批量处理
public function processBatchTasks(array $tasks): void
{
// 并行处理任务
$promises = [];
foreach ($tasks as $task) {
$promises[] = $this->executeTaskAsync($task);
}
// 等待所有任务完成
$results = wait($promises);
foreach ($results as $index => $result) {
if ($result['success']) {
$this->queue->complete($tasks[$index]['id'], $result['data']);
} else {
$this->queue->fail($tasks[$index]['id'], $result['error']);
}
}
}
4.3 任务依赖关系
public function pushWithDependencies(string $queueName, array $task, array $dependencies): string
{
$taskId = uniqid('task_', true);
// 存储依赖关系
$dependencyKey = $this->queuePrefix . 'deps:' . $taskId;
foreach ($dependencies as $depTaskId) {
$this->redis->sadd($dependencyKey, $depTaskId);
}
// 设置任务状态为等待依赖
$taskData = [
'id' => $taskId,
'data' => $task,
'status' => 'waiting_deps',
'dependencies' => json_encode($dependencies)
];
$this->redis->hmset($this->queuePrefix . 'task:' . $taskId, $taskData);
return $taskId;
}
public function checkDependencies(string $taskId): bool
{
$dependencyKey = $this->queuePrefix . 'deps:' . $taskId;
$dependencies = $this->redis->smembers($dependencyKey);
foreach ($dependencies as $depTaskId) {
$depStatus = $this->redis->hget($this->queuePrefix . 'task:' . $depTaskId, 'status');
if ($depStatus !== 'completed') {
return false;
}
}
// 所有依赖都完成,可以执行任务
$this->redis->del($dependencyKey);
return true;
}
五、监控与管理
5.1 监控面板实现
<?php
// queue_monitor.php
class QueueMonitor
{
private $queue;
public function __construct(AsyncTaskQueue $queue)
{
$this->queue = $queue;
}
public function getDashboardData(): array
{
$queues = ['default', 'high_priority', 'emails', 'reports'];
$data = [];
foreach ($queues as $queue) {
$data[$queue] = [
'stats' => $this->queue->getStats($queue),
'throughput' => $this->getThroughput($queue),
'avg_processing_time' => $this->getAvgProcessingTime($queue)
];
}
return $data;
}
public function getFailedTasks(string $queueName, int $limit = 50): array
{
$failedKey = $this->queue->getQueuePrefix() . 'failed:' . $queueName;
$taskIds = $this->queue->getRedis()->lrange($failedKey, 0, $limit - 1);
$tasks = [];
foreach ($taskIds as $taskId) {
$taskData = $this->queue->getRedis()->hgetall(
$this->queue->getQueuePrefix() . 'task:' . $taskId
);
if ($taskData) {
$tasks[] = $taskData;
}
}
return $tasks;
}
public function retryFailedTask(string $taskId): bool
{
$taskKey = $this->queue->getQueuePrefix() . 'task:' . $taskId;
$taskData = $this->queue->getRedis()->hgetall($taskKey);
if (!$taskData || ($taskData['status'] ?? '') !== 'failed') {
return false;
}
// 重新加入队列
$queueName = $taskData['queue'];
$this->queue->getRedis()->lpush(
$this->queue->getQueuePrefix() . 'waiting:' . $queueName,
$taskId
);
// 更新状态
$this->queue->getRedis()->hmset($taskKey, [
'status' => 'pending',
'retried_at' => time(),
'error' => ''
]);
// 从失败队列移除
$this->queue->getRedis()->lrem(
$this->queue->getQueuePrefix() . 'failed:' . $queueName,
0,
$taskId
);
return true;
}
}
5.2 性能优化建议
- 连接池管理:使用连接池减少Redis连接开销
- 管道化操作:批量Redis命令减少网络往返
- 内存优化:定期清理已完成的任务数据
- 持久化策略:重要任务数据定期备份
- 监控告警:设置队列长度阈值告警
5.3 生产环境部署
# Supervisor配置示例
[program:async_worker]
command=php /path/to/worker.php worker
process_name=%(program_name)s_%(process_num)02d
numprocs=4
directory=/path/to
autostart=true
autorestart=true
user=www-data
redirect_stderr=true
stdout_logfile=/var/log/worker.log
stdout_logfile_maxbytes=50MB
stdout_logfile_backups=10
总结
本文详细介绍了基于Redis的PHP异步任务队列系统的完整实现。通过这个系统,您可以:
- 将耗时操作异步化,提升Web应用响应速度
- 实现任务的可靠执行,支持重试和失败处理
- 构建可扩展的任务处理架构
- 通过监控面板实时掌握系统状态
这个解决方案特别适合处理邮件发送、数据报表生成、图片处理、批量数据导入等后台任务场景。您可以根据实际需求进一步扩展功能,如添加任务优先级、任务依赖、分布式Worker等高级特性。

