免费资源下载
一、异步任务队列的核心价值
在传统的PHP同步编程模型中,耗时操作(如邮件发送、图片处理、数据报表生成)会阻塞请求响应,严重影响用户体验。异步任务队列通过将耗时任务解耦到后台执行,实现了请求的快速响应和系统资源的合理利用。
本文将构建一个基于Swoole扩展的轻量级、高性能异步任务队列系统,包含任务发布、消费、重试、监控等完整功能。
二、系统架构设计
任务队列系统架构:
1. 任务生产者(Producer) - 负责创建和发布任务
2. 任务队列(Queue) - 基于Redis的优先队列存储
3. 任务消费者(Consumer) - Swoole常驻进程消费任务
4. 监控中心(Monitor) - 实时监控队列状态和任务执行情况
三、完整代码实现
3.1 环境准备与依赖安装
# 安装Swoole扩展
pecl install swoole
# 安装Redis扩展
pecl install redis
# composer.json依赖
{
"require": {
"ext-swoole": ">=4.8.0",
"ext-redis": "*",
"monolog/monolog": "^2.0"
}
}
3.2 任务队列核心类实现
<?php
/**
* 异步任务队列核心处理器
*/
class AsyncTaskQueue
{
private $redis;
private $logger;
private $isRunning = false;
// 队列优先级定义
const PRIORITY_HIGH = 'high';
const PRIORITY_NORMAL = 'normal';
const PRIORITY_LOW = 'low';
public function __construct(string $redisHost = '127.0.0.1', int $redisPort = 6379)
{
$this->redis = new Redis();
$this->redis->connect($redisHost, $redisPort);
// 初始化日志系统
$this->logger = new MonologLogger('task_queue');
$this->logger->pushHandler(
new MonologHandlerStreamHandler('logs/task_queue.log')
);
}
/**
* 发布任务到队列
*/
public function publish(
string $taskType,
array $data,
string $priority = self::PRIORITY_NORMAL
): string {
$taskId = uniqid('task_', true);
$taskData = [
'id' => $taskId,
'type' => $taskType,
'data' => $data,
'priority' => $priority,
'created_at' => time(),
'attempts' => 0,
'max_attempts' => 3
];
// 根据优先级选择队列
$queueKey = "task_queue:{$priority}";
$encodedTask = json_encode($taskData, JSON_UNESCAPED_UNICODE);
// 使用Redis列表存储任务
$this->redis->lPush($queueKey, $encodedTask);
// 发布任务事件通知
$this->redis->publish('task_events', json_encode([
'event' => 'task_published',
'task_id' => $taskId,
'timestamp' => microtime(true)
]));
$this->logger->info("任务发布成功", [
'task_id' => $taskId,
'type' => $taskType,
'priority' => $priority
]);
return $taskId;
}
/**
* 启动任务消费者
*/
public function startConsumer(int $workerNum = 4): void
{
if ($this->isRunning) {
throw new RuntimeException('消费者已在运行中');
}
$this->isRunning = true;
$pool = new SwooleProcessPool($workerNum);
$pool->on('WorkerStart', function ($pool, $workerId) {
echo "工作进程 {$workerId} 已启动n";
// 每个工作进程独立Redis连接
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 进程常驻执行任务消费
while ($this->isRunning) {
$this->consumeTasks($redis, $workerId);
usleep(100000); // 100ms间隔避免CPU空转
}
});
$pool->on('WorkerStop', function ($pool, $workerId) {
$this->logger->info("工作进程停止", ['worker_id' => $workerId]);
});
// 优雅停止信号处理
pcntl_signal(SIGTERM, function () {
$this->isRunning = false;
$this->logger->info("收到停止信号,准备优雅退出");
});
$pool->start();
}
/**
* 消费任务核心逻辑
*/
private function consumeTasks(Redis $redis, int $workerId): void
{
// 按优先级顺序消费:高 -> 普通 -> 低
$priorities = [self::PRIORITY_HIGH, self::PRIORITY_NORMAL, self::PRIORITY_LOW];
foreach ($priorities as $priority) {
$queueKey = "task_queue:{$priority}";
$taskJson = $redis->rPop($queueKey);
if ($taskJson) {
$task = json_decode($taskJson, true);
$this->processTask($task, $workerId, $redis);
break; // 每次只处理一个任务
}
}
}
/**
* 任务处理逻辑
*/
private function processTask(array $task, int $workerId, Redis $redis): void
{
$taskId = $task['id'];
$startTime = microtime(true);
try {
$this->logger->info("开始处理任务", [
'task_id' => $taskId,
'worker_id' => $workerId,
'type' => $task['type']
]);
// 根据任务类型执行不同处理逻辑
$result = $this->executeTask($task);
$executionTime = round(microtime(true) - $startTime, 3);
$this->logger->info("任务处理成功", [
'task_id' => $taskId,
'execution_time' => $executionTime,
'result' => $result
]);
// 记录成功状态
$redis->hSet('task_results', $taskId, json_encode([
'status' => 'success',
'completed_at' => time(),
'execution_time' => $executionTime
]));
} catch (Exception $e) {
$this->handleTaskFailure($task, $e, $redis, $workerId);
}
}
/**
* 任务失败处理与重试
*/
private function handleTaskFailure(
array $task,
Exception $e,
Redis $redis,
int $workerId
): void {
$taskId = $task['id'];
$attempts = $task['attempts'] + 1;
$this->logger->error("任务处理失败", [
'task_id' => $taskId,
'error' => $e->getMessage(),
'attempt' => $attempts,
'worker_id' => $workerId
]);
// 检查是否达到最大重试次数
if ($attempts lPush($queueKey, json_encode($task));
$this->logger->info("任务已重新排队等待重试", [
'task_id' => $taskId,
'next_retry_at' => date('Y-m-d H:i:s', $task['next_retry_at'])
]);
} else {
// 达到最大重试次数,标记为失败
$redis->hSet('task_failures', $taskId, json_encode([
'task' => $task,
'last_error' => $e->getMessage(),
'failed_at' => time(),
'total_attempts' => $attempts
]));
$this->logger->critical("任务最终失败,已达最大重试次数", [
'task_id' => $taskId,
'max_attempts' => $task['max_attempts']
]);
}
}
/**
* 任务执行分发器
*/
private function executeTask(array $task)
{
$methodName = 'handle' . ucfirst($task['type']) . 'Task';
if (method_exists($this, $methodName)) {
return $this->$methodName($task['data']);
}
// 默认任务处理器
return $this->handleDefaultTask($task['data']);
}
/**
* 邮件发送任务处理器
*/
private function handleEmailTask(array $data): bool
{
// 模拟邮件发送逻辑
sleep(2); // 模拟耗时操作
$to = $data['to'] ?? '';
$subject = $data['subject'] ?? '无主题';
// 实际项目中这里调用邮件发送服务
// mail($to, $subject, $data['content']);
$this->logger->debug("邮件发送任务完成", [
'to' => $to,
'subject' => $subject
]);
return true;
}
/**
* 图片处理任务处理器
*/
private function handleImageProcessTask(array $data): array
{
$imagePath = $data['image_path'] ?? '';
// 模拟图片处理操作
sleep(3);
return [
'original_size' => '2.5MB',
'compressed_size' => '450KB',
'format' => 'webp',
'dimensions' => '1920x1080'
];
}
/**
* 默认任务处理器
*/
private function handleDefaultTask(array $data): bool
{
// 记录未识别的任务类型
$this->logger->warning("执行默认任务处理器", ['data' => $data]);
return true;
}
/**
* 获取队列状态统计
*/
public function getQueueStats(): array
{
$priorities = [self::PRIORITY_HIGH, self::PRIORITY_NORMAL, self::PRIORITY_LOW];
$stats = [];
foreach ($priorities as $priority) {
$queueKey = "task_queue:{$priority}";
$stats[$priority] = [
'pending' => $this->redis->lLen($queueKey),
'key' => $queueKey
];
}
return [
'queues' => $stats,
'success_count' => $this->redis->hLen('task_results'),
'failure_count' => $this->redis->hLen('task_failures'),
'timestamp' => time()
];
}
}
3.3 任务生产者示例
<?php
// 生产者示例代码
require_once 'AsyncTaskQueue.php';
$taskQueue = new AsyncTaskQueue();
// 发布高优先级邮件任务
$emailTaskId = $taskQueue->publish('email', [
'to' => 'user@example.com',
'subject' => '订单确认通知',
'content' => '您的订单已确认,订单号:20231027001'
], AsyncTaskQueue::PRIORITY_HIGH);
echo "邮件任务已发布,ID: {$emailTaskId}n";
// 发布普通优先级图片处理任务
$imageTaskId = $taskQueue->publish('imageProcess', [
'image_path' => '/uploads/original/product.jpg',
'operations' => ['resize', 'compress', 'convert']
], AsyncTaskQueue::PRIORITY_NORMAL);
echo "图片处理任务已发布,ID: {$imageTaskId}n";
// 获取队列状态
$stats = $taskQueue->getQueueStats();
print_r($stats);
3.4 启动消费者服务
<?php
// consumer.php - 消费者启动脚本
require_once 'AsyncTaskQueue.php';
// 设置进程标题
cli_set_process_title('php_task_consumer');
$taskQueue = new AsyncTaskQueue();
// 注册信号处理器
pcntl_async_signals(true);
pcntl_signal(SIGINT, function() use ($taskQueue) {
echo "收到中断信号,准备停止...n";
exit(0);
});
echo "=== 异步任务队列消费者启动 ===n";
echo "启动时间: " . date('Y-m-d H:i:s') . "n";
echo "工作进程数: 4n";
echo "按 Ctrl+C 停止服务nn";
// 启动消费者进程池
$taskQueue->startConsumer(4);
四、监控与管理界面
<?php
// monitor.php - 队列监控面板
class TaskQueueMonitor
{
private $redis;
public function __construct()
{
$this->redis = new Redis();
$this->redis->connect('127.0.0.1', 6379);
}
public function renderDashboard(): string
{
$stats = $this->getDetailedStats();
$html = '<!DOCTYPE html>
<html>
<head>
<title>任务队列监控面板</title>
<meta charset="UTF-8">
</head>
<body>
<h1>异步任务队列监控面板</h1>
<div class="stats">
<h2>队列状态概览</h2>
<table border="1">
<tr>
<th>队列优先级</th>
<th>待处理任务</th>
<th>状态</th>
</tr>';
foreach ($stats['queues'] as $priority => $queueStats) {
$status = $queueStats['pending'] > 100 ? '<span style="color:red">拥堵</span>' :
($queueStats['pending'] > 50 ? '<span style="color:orange">繁忙</span>' :
'<span style="color:green">正常</span>');
$html .= "<tr>
<td>{$priority}</td>
<td>{$queueStats['pending']}</td>
<td>{$status}</td>
</tr>";
}
$html .= '</table>
<h3>执行统计</h3>
<p>成功任务: ' . $stats['success_count'] . '</p>
<p>失败任务: ' . $stats['failure_count'] . '</p>
<p>最后更新: ' . date('Y-m-d H:i:s', $stats['timestamp']) . '</p>
</div>
<h2>最近失败的任务</h2>';
$failures = $this->getRecentFailures(10);
if (!empty($failures)) {
$html .= '<ul>';
foreach ($failures as $failure) {
$html .= "<li>任务ID: {$failure['task_id']} - 错误: {$failure['error']}</li>";
}
$html .= '</ul>';
}
$html .= '</body></html>';
return $html;
}
private function getDetailedStats(): array
{
// 获取详细的统计信息
return [
'queues' => [
'high' => ['pending' => $this->redis->lLen('task_queue:high')],
'normal' => ['pending' => $this->redis->lLen('task_queue:normal')],
'low' => ['pending' => $this->redis->lLen('task_queue:low')]
],
'success_count' => $this->redis->hLen('task_results'),
'failure_count' => $this->redis->hLen('task_failures'),
'timestamp' => time()
];
}
private function getRecentFailures(int $limit): array
{
// 获取最近的失败任务
$allFailures = $this->redis->hGetAll('task_failures');
$failures = [];
$count = 0;
foreach ($allFailures as $taskId => $failureJson) {
if ($count >= $limit) break;
$failure = json_decode($failureJson, true);
$failures[] = [
'task_id' => $taskId,
'error' => $failure['last_error'] ?? '未知错误',
'failed_at' => date('Y-m-d H:i:s', $failure['failed_at'] ?? time())
];
$count++;
}
return $failures;
}
}
// 输出监控面板
$monitor = new TaskQueueMonitor();
echo $monitor->renderDashboard();
五、性能优化与生产部署建议
5.1 性能优化策略
- 连接池管理:为每个工作进程维护独立的Redis连接,避免连接竞争
- 批量任务处理:支持批量拉取任务,减少网络IO次数
- 内存优化:及时释放已处理任务的内存,避免内存泄漏
- 超时控制:为每个任务设置执行超时时间,防止僵尸任务
5.2 生产环境部署配置
# supervisor配置示例
[program:task_queue_consumer]
command=php /path/to/consumer.php
process_name=%(program_name)s_%(process_num)02d
numprocs=4
directory=/path/to/your/app
autostart=true
autorestart=true
user=www-data
redirect_stderr=true
stdout_logfile=/var/log/task_queue.log
stdout_logfile_maxbytes=50MB
stdout_logfile_backups=10
5.3 高可用方案
- 多节点部署:在不同服务器部署多个消费者实例
- 故障转移:使用Redis Sentinel或Cluster实现队列高可用
- 数据持久化:定期备份任务状态和失败记录
- 监控告警:集成Prometheus + Grafana实现实时监控
六、总结与扩展
本文构建的异步任务队列系统具有以下特点:
- 高性能:基于Swoole协程和Redis内存队列,支持高并发任务处理
- 可靠性:完善的失败重试机制和任务状态持久化
- 可扩展性:支持多优先级队列和动态工作进程调整
- 易监控:提供完整的监控面板和日志记录
扩展方向建议:
- 集成消息中间件(RabbitMQ/Kafka)替代Redis队列
- 添加任务依赖关系管理
- 实现分布式任务调度
- 开发Web管理界面
- 支持任务进度实时反馈
通过本教程的实现,您已经掌握了构建企业级PHP异步任务队列的核心技术。这套方案可以轻松处理日百万级别的异步任务,显著提升Web应用的响应速度和用户体验。

