原创技术教程 | 作者:PHP技术专家 | 发布日期:2023年10月
一、异步任务队列的技术背景
在现代Web应用中,用户请求往往需要处理耗时操作,如发送邮件、生成报表、处理图像等。如果这些操作同步执行,会导致用户等待时间过长,影响用户体验。异步任务队列通过将耗时任务放入队列,由后台工作进程异步处理,实现了请求的快速响应。
传统同步处理的局限性:
- HTTP请求超时限制(通常30秒)
- 服务器资源占用时间长
- 无法处理大量并发任务
- 单点故障影响整个系统
Redis作为队列存储的优势:
- 内存操作,读写速度快
- 支持丰富的数据结构(List、Sorted Set)
- 持久化机制保证数据安全
- 发布订阅功能支持任务通知
二、系统架构设计
我们设计一个三层架构的异步任务系统:
1. 任务生产者(Producer)
Web应用 → 创建任务 → Redis队列
2. 任务队列(Queue)
Redis List结构存储任务
支持优先级队列
任务状态追踪
3. 任务消费者(Consumer)
常驻PHP进程
多进程并发处理
失败重试机制
数据流程图:
用户请求 → 任务创建 → Redis入队 → 返回响应
后台Worker → 监听队列 → 执行任务 → 更新状态
监控系统 → 查看状态 → 错误报警 → 日志记录
三、完整代码实现
1. 环境要求
PHP 7.4+
Redis 5.0+
Composer依赖管理
PCNTL扩展(多进程支持)
2. 核心队列类实现
<?php
/**
* Redis任务队列核心类
*/
class RedisTaskQueue
{
private $redis;
private $queueKey = 'task_queue';
private $processingKey = 'task_processing';
private $failedKey = 'task_failed';
public function __construct($host = '127.0.0.1', $port = 6379)
{
$this->redis = new Redis();
$this->redis->connect($host, $port);
$this->redis->setOption(Redis::OPT_SERIALIZER, Redis::SERIALIZER_PHP);
}
/**
* 添加任务到队列
*/
public function push($task, $priority = 'normal')
{
$taskData = [
'id' => uniqid('task_', true),
'data' => $task,
'priority' => $priority,
'created_at' => time(),
'status' => 'pending'
];
$queueKey = $this->getPriorityKey($priority);
return $this->redis->lPush($queueKey, $taskData);
}
/**
* 获取任务(阻塞式)
*/
public function pop($timeout = 30)
{
// 优先处理高优先级队列
$task = $this->redis->brPop(['high_priority_queue', 'normal_queue'], $timeout);
if ($task) {
$taskData = $task[1];
$taskData['status'] = 'processing';
$taskData['started_at'] = time();
// 移动到处理中队列
$this->redis->hSet($this->processingKey, $taskData['id'], $taskData);
return $taskData;
}
return null;
}
/**
* 完成任务
*/
public function complete($taskId, $result = null)
{
$task = $this->redis->hGet($this->processingKey, $taskId);
if ($task) {
$task['status'] = 'completed';
$task['completed_at'] = time();
$task['result'] = $result;
// 保存到完成记录(可设置过期时间)
$this->redis->hSet('task_completed', $taskId, $task);
$this->redis->hDel($this->processingKey, $taskId);
return true;
}
return false;
}
/**
* 任务失败处理
*/
public function fail($taskId, $error)
{
$task = $this->redis->hGet($this->processingKey, $taskId);
if ($task) {
$task['status'] = 'failed';
$task['failed_at'] = time();
$task['error'] = $error;
$task['retry_count'] = ($task['retry_count'] ?? 0) + 1;
// 重试逻辑(最多3次)
if ($task['retry_count'] push($task['data'], $task['priority']);
} else {
$this->redis->lPush($this->failedKey, $task);
}
$this->redis->hDel($this->processingKey, $taskId);
}
}
private function getPriorityKey($priority)
{
return $priority . '_priority_queue';
}
/**
* 获取队列统计信息
*/
public function getStats()
{
return [
'pending_high' => $this->redis->lLen('high_priority_queue'),
'pending_normal' => $this->redis->lLen('normal_queue'),
'processing' => $this->redis->hLen($this->processingKey),
'failed' => $this->redis->lLen($this->failedKey)
];
}
}
?>
3. Worker工作进程实现
<?php
/**
* 任务工作进程管理器
*/
class TaskWorkerManager
{
private $queue;
private $workers = [];
private $maxWorkers = 5;
private $shutdown = false;
public function __construct(RedisTaskQueue $queue)
{
$this->queue = $queue;
pcntl_signal(SIGTERM, [$this, 'shutdown']);
pcntl_signal(SIGINT, [$this, 'shutdown']);
}
/**
* 启动工作进程
*/
public function start()
{
echo "启动任务工作进程...n";
for ($i = 0; $i maxWorkers; $i++) {
$this->forkWorker($i);
}
// 监控子进程
while (!$this->shutdown) {
pcntl_signal_dispatch();
$status = null;
$pid = pcntl_wait($status, WNOHANG);
if ($pid > 0) {
echo "工作进程 {$pid} 退出,重新启动...n";
$this->restartWorker($pid);
}
sleep(1);
}
$this->stopAllWorkers();
}
private function forkWorker($workerId)
{
$pid = pcntl_fork();
if ($pid == -1) {
die("无法创建子进程");
} elseif ($pid == 0) {
// 子进程
$this->runWorker($workerId);
exit(0);
} else {
// 父进程记录PID
$this->workers[$pid] = [
'id' => $workerId,
'started_at' => time()
];
}
}
private function runWorker($workerId)
{
echo "工作进程 {$workerId} 启动,PID: " . getmypid() . "n";
$taskHandler = new TaskHandler();
while (!$this->shutdown) {
try {
// 获取任务(阻塞30秒)
$task = $this->queue->pop(30);
if ($task) {
echo "进程 {$workerId} 处理任务: {$task['id']}n";
// 执行任务
$result = $taskHandler->handle($task['data']);
// 标记完成
$this->queue->complete($task['id'], $result);
echo "进程 {$workerId} 完成任务: {$task['id']}n";
}
} catch (Exception $e) {
error_log("工作进程错误: " . $e->getMessage());
if (isset($task)) {
$this->queue->fail($task['id'], $e->getMessage());
}
// 短暂休息后继续
sleep(5);
}
}
}
public function shutdown()
{
echo "接收到关闭信号,停止所有工作进程...n";
$this->shutdown = true;
}
private function stopAllWorkers()
{
foreach (array_keys($this->workers) as $pid) {
posix_kill($pid, SIGTERM);
}
}
private function restartWorker($oldPid)
{
if (isset($this->workers[$oldPid])) {
$workerId = $this->workers[$oldPid]['id'];
unset($this->workers[$oldPid]);
$this->forkWorker($workerId);
}
}
}
/**
* 任务处理器基类
*/
abstract class TaskHandler
{
abstract public function handle($taskData);
protected function log($message)
{
file_put_contents(
'task_log.log',
date('Y-m-d H:i:s') . " - " . $message . "n",
FILE_APPEND
);
}
}
?>
四、实战案例:邮件发送系统
1. 邮件任务处理器
<?php
/**
* 邮件发送任务处理器
*/
class EmailTaskHandler extends TaskHandler
{
private $mailer;
public function __construct()
{
// 初始化邮件发送器(这里使用PHPMailer示例)
$this->mailer = new PHPMailerPHPMailerPHPMailer();
$this->mailer->isSMTP();
$this->mailer->Host = 'smtp.example.com';
$this->mailer->SMTPAuth = true;
$this->mailer->Username = 'user@example.com';
$this->mailer->Password = 'password';
$this->mailer->SMTPSecure = 'tls';
$this->mailer->Port = 587;
}
public function handle($taskData)
{
$this->log("开始发送邮件到: " . $taskData['to']);
$this->mailer->setFrom('noreply@example.com', '系统通知');
$this->mailer->addAddress($taskData['to']);
$this->mailer->Subject = $taskData['subject'];
$this->mailer->Body = $taskData['body'];
$this->mailer->isHTML(true);
if ($this->mailer->send()) {
$this->log("邮件发送成功: " . $taskData['to']);
return ['success' => true, 'message_id' => $this->mailer->getLastMessageID()];
} else {
throw new Exception("邮件发送失败: " . $this->mailer->ErrorInfo);
}
}
}
?>
2. Web控制器中使用队列
<?php
/**
* 用户注册控制器示例
*/
class RegisterController
{
private $queue;
public function __construct()
{
$this->queue = new RedisTaskQueue();
}
public function register($userData)
{
// 1. 保存用户到数据库
$userId = $this->saveUser($userData);
// 2. 创建欢迎邮件任务(高优先级)
$emailTask = [
'to' => $userData['email'],
'subject' => '欢迎注册我们的服务',
'body' => $this->renderWelcomeEmail($userData),
'user_id' => $userId
];
$this->queue->push($emailTask, 'high');
// 3. 创建后续跟进任务(普通优先级)
$followupTask = [
'type' => 'followup_email',
'user_id' => $userId,
'scheduled_time' => time() + 86400 // 24小时后
];
$this->queue->push($followupTask, 'normal');
// 4. 立即返回响应
return [
'success' => true,
'message' => '注册成功,欢迎邮件已发送',
'user_id' => $userId
];
}
private function renderWelcomeEmail($userData)
{
return "<h1>欢迎,{$userData['name']}!</h1>
<p>感谢您注册我们的服务。</p>
<p>您的账户已经激活,可以立即开始使用。</p>";
}
}
// 使用示例
$controller = new RegisterController();
$result = $controller->register([
'name' => '张三',
'email' => 'zhangsan@example.com',
'password' => 'hashed_password'
]);
echo json_encode($result);
?>
3. 启动Worker进程
<?php
// worker.php
require_once 'vendor/autoload.php';
require_once 'RedisTaskQueue.php';
require_once 'TaskWorkerManager.php';
require_once 'EmailTaskHandler.php';
// 注册自定义错误处理
set_error_handler(function($errno, $errstr, $errfile, $errline) {
error_log("Worker错误: [$errno] $errstr in $errfile on line $errline");
});
// 创建队列实例
$queue = new RedisTaskQueue('127.0.0.1', 6379);
// 启动Worker管理器
$manager = new TaskWorkerManager($queue);
$manager->start();
// 命令行启动:php worker.php --daemon
?>
五、性能优化与监控
1. 性能优化策略
- 连接池管理:复用Redis连接,避免频繁创建连接
- 批量处理:一次处理多个任务,减少IO操作
- 内存控制:监控Worker内存使用,防止内存泄漏
- 动态扩缩容:根据队列长度自动调整Worker数量
2. 监控系统实现
<?php
/**
* 队列监控仪表板
*/
class QueueMonitor
{
private $queue;
private $redis;
public function __construct(RedisTaskQueue $queue)
{
$this->queue = $queue;
$this->redis = new Redis();
$this->redis->connect('127.0.0.1', 6379);
}
public function getDashboardData()
{
$stats = $this->queue->getStats();
// 获取处理速度
$hourlyStats = $this->redis->get('queue_stats:hourly');
$dailyStats = $this->redis->get('queue_stats:daily');
// 获取失败任务详情
$failedTasks = $this->redis->lRange('task_failed', 0, 9);
return [
'current_stats' => $stats,
'performance' => [
'tasks_per_hour' => $hourlyStats['completed'] ?? 0,
'avg_processing_time' => $this->calculateAvgTime(),
'failure_rate' => $this->calculateFailureRate()
],
'recent_failures' => $failedTasks,
'system_health' => $this->checkSystemHealth()
];
}
public function alertIfNeeded($stats)
{
// 失败率超过5%报警
$failureRate = $this->calculateFailureRate();
if ($failureRate > 5) {
$this->sendAlert("队列失败率过高: {$failureRate}%");
}
// 积压任务过多报警
if ($stats['pending_normal'] > 1000) {
$this->sendAlert("队列积压严重: {$stats['pending_normal']} 个任务等待");
}
}
private function sendAlert($message)
{
// 集成到报警系统(邮件、Slack、钉钉等)
file_put_contents('alerts.log', date('Y-m-d H:i:s') . " - " . $message . "n", FILE_APPEND);
}
}
?>
3. 高级特性扩展
- 延迟队列:使用Redis的Sorted Set实现定时任务
- 任务依赖:实现任务之间的依赖关系
- 分布式部署:多服务器部署Worker,实现负载均衡
- 任务优先级:支持多级优先级队列
- 任务进度追踪:实时反馈任务执行进度
4. 生产环境部署建议
- 使用Supervisor管理Worker进程
- 配置Redis持久化和主从复制
- 实现日志轮转和监控告警
- 设置合理的任务超时时间
- 定期清理已完成的任务记录
六、总结与展望
本文详细介绍了基于PHP和Redis的异步任务队列系统的设计与实现。通过这个系统,我们可以:
- 显著提升Web应用的响应速度
- 提高系统的吞吐量和并发处理能力
- 实现任务的可靠执行和失败重试
- 方便地扩展新的任务类型
技术要点回顾:
- Redis的List数据结构非常适合实现队列
- PHP的多进程编程需要谨慎处理资源竞争
- 任务状态管理是保证系统可靠性的关键
- 监控和报警是生产环境必不可少的组件
未来扩展方向:
- 集成更专业的消息队列(RabbitMQ、Kafka)
- 实现可视化任务管理界面
- 支持分布式事务和最终一致性
- 容器化部署和自动扩缩容
通过本文的学习,您已经掌握了构建高性能PHP异步任务系统的核心技能。在实际项目中,可以根据具体需求调整和优化这个基础框架,构建出适合自己业务场景的异步处理系统。

