原创技术教程 | 最后更新:2023年10月
一、异步队列的核心价值
在Web开发中,同步处理耗时任务(如邮件发送、图片处理、数据报表生成)会导致请求响应时间过长,影响用户体验。异步任务队列通过将耗时操作放入队列后台执行,实现请求的快速响应。本教程将构建一个基于Redis的PHP异步邮件发送系统,处理能力可达每秒千级邮件发送。
技术栈选择理由:
- Redis:内存数据库,提供List数据结构实现队列,支持持久化
- PHPMailer:专业的邮件发送库,支持SMTP和多种邮件协议
- Supervisor:进程管理工具,确保队列消费者持续运行
二、系统架构设计
┌─────────────────┐ ┌─────────────┐ ┌──────────────┐
│ Web应用层 │───▶│ Redis队列 │───▶│ Worker进程 │
│ (生产者) │ │ (消息中间件)│ │ (消费者) │
└─────────────────┘ └─────────────┘ └──────────────┘
│ │ │
│ 快速响应 │ 异步存储 │ 后台处理
▼ ▼ ▼
┌─────────────────┐ ┌─────────────┐ ┌──────────────┐
│ HTTP请求立即返回 │ │ 任务持久化 │ │ 邮件发送服务 │
└─────────────────┘ └─────────────┘ └──────────────┘
数据流转过程:
- 用户触发邮件发送请求
- 生产者将邮件任务序列化后存入Redis队列
- HTTP请求立即返回,告知用户”邮件正在发送”
- 独立的Worker进程从队列获取任务并执行
- 执行结果记录到日志或数据库
三、完整代码实现
1. 环境准备与依赖安装
# 安装Redis扩展
pecl install redis
# 使用Composer安装依赖
composer require phpmailer/phpmailer
composer require predis/predis
2. 队列生产者实现(Producer.php)
<?php
require 'vendor/autoload.php';
use PredisClient;
class MailQueueProducer {
private $redis;
private $queueName = 'mail_queue';
public function __construct() {
$this->redis = new Client([
'scheme' => 'tcp',
'host' => '127.0.0.1',
'port' => 6379,
'database' => 0
]);
}
/**
* 将邮件任务加入队列
* @param array $mailData 邮件数据
* @return string 任务ID
*/
public function pushToQueue(array $mailData): string {
$taskId = uniqid('mail_', true);
$taskData = [
'id' => $taskId,
'data' => $mailData,
'created_at' => time(),
'status' => 'pending'
];
// 序列化任务数据
$serializedData = json_encode($taskData);
// 使用RPUSH将任务添加到队列尾部
$this->redis->rpush($this->queueName, $serializedData);
// 可选:将任务元数据存储到Hash中便于追踪
$this->redis->hset('mail_tasks', $taskId, $serializedData);
return $taskId;
}
/**
* 生成邮件发送任务
*/
public function createMailTask(
string $to,
string $subject,
string $body,
string $from = 'noreply@example.com'
): string {
$mailData = [
'to' => $to,
'subject' => $subject,
'body' => $body,
'from' => $from,
'is_html' => true,
'charset' => 'UTF-8'
];
return $this->pushToQueue($mailData);
}
}
// 使用示例
if ($_SERVER['REQUEST_METHOD'] === 'POST') {
$producer = new MailQueueProducer();
$taskId = $producer->createMailTask(
$_POST['email'],
$_POST['subject'],
$_POST['message']
);
echo json_encode([
'success' => true,
'message' => '邮件已加入发送队列',
'task_id' => $taskId
]);
exit;
}
?>
3. 队列消费者实现(Consumer.php)
<?php
require 'vendor/autoload.php';
use PredisClient;
use PHPMailerPHPMailerPHPMailer;
use PHPMailerPHPMailerException;
class MailQueueConsumer {
private $redis;
private $mailer;
private $queueName = 'mail_queue';
private $maxRetries = 3;
public function __construct() {
// Redis连接
$this->redis = new Client([
'scheme' => 'tcp',
'host' => '127.0.0.1',
'port' => 6379,
'database' => 0
]);
// 初始化PHPMailer
$this->mailer = new PHPMailer(true);
$this->configureMailer();
}
private function configureMailer(): void {
// SMTP配置(根据实际情况修改)
$this->mailer->isSMTP();
$this->mailer->Host = 'smtp.example.com';
$this->mailer->SMTPAuth = true;
$this->mailer->Username = 'your_email@example.com';
$this->mailer->Password = 'your_password';
$this->mailer->SMTPSecure = PHPMailer::ENCRYPTION_STARTTLS;
$this->mailer->Port = 587;
$this->mailer->CharSet = 'UTF-8';
}
/**
* 处理单个邮件任务
*/
private function processTask(array $taskData): bool {
$taskId = $taskData['id'];
$mailData = $taskData['data'];
try {
// 更新任务状态为处理中
$taskData['status'] = 'processing';
$taskData['started_at'] = time();
$this->redis->hset('mail_tasks', $taskId, json_encode($taskData));
// 配置邮件
$this->mailer->setFrom($mailData['from'], '系统邮件');
$this->mailer->addAddress($mailData['to']);
$this->mailer->Subject = $mailData['subject'];
if ($mailData['is_html']) {
$this->mailer->isHTML(true);
$this->mailer->Body = $mailData['body'];
$this->mailer->AltBody = strip_tags($mailData['body']);
} else {
$this->mailer->Body = $mailData['body'];
}
// 发送邮件
$this->mailer->send();
// 更新任务状态为成功
$taskData['status'] = 'completed';
$taskData['completed_at'] = time();
$this->redis->hset('mail_tasks', $taskId, json_encode($taskData));
$this->log("任务 {$taskId} 发送成功");
return true;
} catch (Exception $e) {
// 处理失败,重试逻辑
$taskData['retries'] = ($taskData['retries'] ?? 0) + 1;
$taskData['last_error'] = $e->getMessage();
if ($taskData['retries'] maxRetries) {
// 重新加入队列
$taskData['status'] = 'pending';
$this->redis->rpush($this->queueName, json_encode($taskData));
$this->log("任务 {$taskId} 发送失败,等待重试");
} else {
// 超过重试次数,标记为失败
$taskData['status'] = 'failed';
$this->redis->hset('mail_tasks', $taskId, json_encode($taskData));
$this->log("任务 {$taskId} 发送失败,已达最大重试次数");
}
return false;
}
}
/**
* 启动消费者守护进程
*/
public function startConsuming(int $sleepTime = 1): void {
$this->log("邮件队列消费者启动...");
while (true) {
try {
// 使用BLPOP阻塞式获取任务(等待10秒)
$result = $this->redis->blpop([$this->queueName], 10);
if ($result) {
$taskJson = $result[1];
$taskData = json_decode($taskJson, true);
if ($taskData) {
$this->processTask($taskData);
}
}
// 防止CPU占用过高
if ($sleepTime > 0) {
usleep($sleepTime * 1000000);
}
} catch (Exception $e) {
$this->log("消费者异常: " . $e->getMessage());
sleep(5); // 发生异常时等待5秒再继续
}
}
}
private function log(string $message): void {
$timestamp = date('Y-m-d H:i:s');
echo "[{$timestamp}] {$message}n";
// 同时写入文件日志
file_put_contents(
'mail_queue.log',
"[{$timestamp}] {$message}n",
FILE_APPEND
);
}
}
// 启动消费者(命令行执行)
if (php_sapi_name() === 'cli') {
$consumer = new MailQueueConsumer();
$consumer->startConsuming();
} else {
die("此脚本仅支持命令行执行");
}
?>
4. 监控面板实现(Monitor.php)
<?php
require 'vendor/autoload.php';
use PredisClient;
class QueueMonitor {
private $redis;
public function __construct() {
$this->redis = new Client();
}
public function getQueueStats(): array {
$queueName = 'mail_queue';
return [
'queue_length' => $this->redis->llen($queueName),
'pending_tasks' => $this->countTasksByStatus('pending'),
'processing_tasks' => $this->countTasksByStatus('processing'),
'completed_tasks' => $this->countTasksByStatus('completed'),
'failed_tasks' => $this->countTasksByStatus('failed'),
'total_tasks' => $this->redis->hlen('mail_tasks'),
'memory_usage' => $this->getMemoryUsage()
];
}
private function countTasksByStatus(string $status): int {
$count = 0;
$tasks = $this->redis->hgetall('mail_tasks');
foreach ($tasks as $taskJson) {
$task = json_decode($taskJson, true);
if ($task['status'] === $status) {
$count++;
}
}
return $count;
}
private function getMemoryUsage(): string {
$info = $this->redis->info('memory');
return round($info['used_memory'] / 1024 / 1024, 2) . ' MB';
}
}
// Web监控界面
$monitor = new QueueMonitor();
$stats = $monitor->getQueueStats();
?>
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>邮件队列监控面板</title>
</head>
<body>
<h1>邮件队列实时监控</h1>
<div>
<h2>队列统计</h2>
<ul>
<li>队列长度: <strong><?= $stats['queue_length'] ?></strong></li>
<li>等待中任务: <strong><?= $stats['pending_tasks'] ?></strong></li>
<li>处理中任务: <strong><?= $stats['processing_tasks'] ?></strong></li>
<li>已完成任务: <strong><?= $stats['completed_tasks'] ?></strong></li>
<li>失败任务: <strong><?= $stats['failed_tasks'] ?></strong></li>
<li>总任务数: <strong><?= $stats['total_tasks'] ?></strong></li>
<li>内存使用: <strong><?= $stats['memory_usage'] ?></strong></li>
</ul>
</div>
<div>
<h2>快速操作</h2>
<button onclick="location.reload()">刷新数据</button>
<button onclick="clearCompleted()">清理已完成任务</button>
</div>
<script>
function clearCompleted() {
if (confirm('确定要清理已完成的任务吗?')) {
fetch('clear_tasks.php')
.then(response => response.json())
.then(data => {
alert(data.message);
location.reload();
});
}
}
// 自动刷新(每30秒)
setInterval(() => {
location.reload();
}, 30000);
</script>
</body>
</html>
四、部署与监控
1. Supervisor配置
# /etc/supervisor/conf.d/mail_queue.conf
[program:mail_queue_worker]
command=php /path/to/Consumer.php
process_name=%(program_name)s_%(process_num)02d
numprocs=4 # 启动4个消费者进程
directory=/path/to/your/project
autostart=true
autorestart=true
user=www-data
redirect_stderr=true
stdout_logfile=/var/log/mail_queue.log
stdout_logfile_maxbytes=10MB
stdout_logfile_backups=5
2. 启动与管理命令
# 重载Supervisor配置
sudo supervisorctl reread
sudo supervisorctl update
# 启动队列工作者
sudo supervisorctl start mail_queue_worker:*
# 查看状态
sudo supervisorctl status
# 重启所有工作者
sudo supervisorctl restart mail_queue_worker:*
# 查看日志
tail -f /var/log/mail_queue.log
3. 性能监控指标
- 队列积压:监控队列长度,超过阈值报警
- 处理速度:统计每分钟处理的任务数
- 失败率:失败任务占总任务的比例
- 内存使用:Redis内存占用情况
- 延迟时间:任务从创建到开始处理的时间差
五、性能优化策略
1. 连接池优化
// 使用连接池减少Redis连接开销
class RedisPool {
private static $instances = [];
private $pool;
public static function getInstance($configKey = 'default') {
if (!isset(self::$instances[$configKey])) {
self::$instances[$configKey] = new self($configKey);
}
return self::$instances[$configKey];
}
private function __construct($configKey) {
$this->pool = new SplQueue();
$config = $this->getConfig($configKey);
// 初始化连接池
for ($i = 0; $i pool->enqueue(new Client($config));
}
}
public function getConnection() {
if ($this->pool->count() > 0) {
return $this->pool->dequeue();
}
return new Client($this->getConfig());
}
public function releaseConnection($connection) {
$this->pool->enqueue($connection);
}
}
2. 批量处理优化
// 批量获取和处理任务
public function batchProcess(int $batchSize = 10): void {
$tasks = [];
// 批量获取任务
for ($i = 0; $i redis->lpop($this->queueName);
if ($result) {
$tasks[] = json_decode($result, true);
} else {
break;
}
}
if (empty($tasks)) {
return;
}
// 批量处理
foreach ($tasks as $task) {
$this->processTask($task);
}
// 记录批量处理结果
$this->log("批量处理完成,共处理 " . count($tasks) . " 个任务");
}
3. 故障恢复机制
- 死信队列:将多次失败的任务转移到死信队列
- 数据持久化:定期将Redis数据备份到磁盘
- 优雅关闭:收到停止信号时完成当前任务再退出
- 健康检查:实现HTTP健康检查端点
4. 扩展建议
- 多队列优先级:实现高、中、低优先级队列
- 延迟队列:使用Redis的Sorted Set实现延迟任务
- 集群部署:Redis集群提高可用性和性能
- 可视化监控:集成Grafana展示队列指标
- 自动扩缩容:根据队列长度动态调整Worker数量
总结
本文详细介绍了基于PHP和Redis构建异步任务队列的完整方案。通过生产者-消费者模式,实现了邮件发送的异步处理,显著提升了系统响应速度。系统具备以下特点:
- 高性能:Redis内存操作,支持高并发任务处理
- 高可靠:任务持久化、重试机制、故障恢复
- 易监控:完整的监控面板和日志系统
- 可扩展:支持水平扩展和多进程处理
- 生产就绪:包含部署方案和性能优化建议
此方案不仅适用于邮件发送,稍作修改即可应用于短信通知、图片处理、数据同步等各种异步场景,是构建高性能PHP应用的必备技术。

