PHP异步任务队列实战:基于Redis实现高性能邮件发送系统 | 后端开发教程

2025-12-09 0 904

原创技术教程 | 最后更新:2023年10月

一、异步队列的核心价值

在Web开发中,同步处理耗时任务(如邮件发送、图片处理、数据报表生成)会导致请求响应时间过长,影响用户体验。异步任务队列通过将耗时操作放入队列后台执行,实现请求的快速响应。本教程将构建一个基于Redis的PHP异步邮件发送系统,处理能力可达每秒千级邮件发送。

技术栈选择理由:

  • Redis:内存数据库,提供List数据结构实现队列,支持持久化
  • PHPMailer:专业的邮件发送库,支持SMTP和多种邮件协议
  • Supervisor:进程管理工具,确保队列消费者持续运行

二、系统架构设计


┌─────────────────┐    ┌─────────────┐    ┌──────────────┐
│   Web应用层     │───▶│ Redis队列   │───▶│  Worker进程  │
│  (生产者)       │    │  (消息中间件)│    │  (消费者)    │
└─────────────────┘    └─────────────┘    └──────────────┘
        │                      │                    │
        │ 快速响应              │ 异步存储           │ 后台处理
        ▼                      ▼                    ▼
┌─────────────────┐    ┌─────────────┐    ┌──────────────┐
│ HTTP请求立即返回 │    │  任务持久化  │    │  邮件发送服务 │
└─────────────────┘    └─────────────┘    └──────────────┘
                

数据流转过程:

  1. 用户触发邮件发送请求
  2. 生产者将邮件任务序列化后存入Redis队列
  3. HTTP请求立即返回,告知用户”邮件正在发送”
  4. 独立的Worker进程从队列获取任务并执行
  5. 执行结果记录到日志或数据库

三、完整代码实现

1. 环境准备与依赖安装


# 安装Redis扩展
pecl install redis

# 使用Composer安装依赖
composer require phpmailer/phpmailer
composer require predis/predis
                

2. 队列生产者实现(Producer.php)


<?php
require 'vendor/autoload.php';

use PredisClient;

class MailQueueProducer {
    private $redis;
    private $queueName = 'mail_queue';
    
    public function __construct() {
        $this->redis = new Client([
            'scheme' => 'tcp',
            'host'   => '127.0.0.1',
            'port'   => 6379,
            'database' => 0
        ]);
    }
    
    /**
     * 将邮件任务加入队列
     * @param array $mailData 邮件数据
     * @return string 任务ID
     */
    public function pushToQueue(array $mailData): string {
        $taskId = uniqid('mail_', true);
        $taskData = [
            'id' => $taskId,
            'data' => $mailData,
            'created_at' => time(),
            'status' => 'pending'
        ];
        
        // 序列化任务数据
        $serializedData = json_encode($taskData);
        
        // 使用RPUSH将任务添加到队列尾部
        $this->redis->rpush($this->queueName, $serializedData);
        
        // 可选:将任务元数据存储到Hash中便于追踪
        $this->redis->hset('mail_tasks', $taskId, $serializedData);
        
        return $taskId;
    }
    
    /**
     * 生成邮件发送任务
     */
    public function createMailTask(
        string $to,
        string $subject,
        string $body,
        string $from = 'noreply@example.com'
    ): string {
        $mailData = [
            'to' => $to,
            'subject' => $subject,
            'body' => $body,
            'from' => $from,
            'is_html' => true,
            'charset' => 'UTF-8'
        ];
        
        return $this->pushToQueue($mailData);
    }
}

// 使用示例
if ($_SERVER['REQUEST_METHOD'] === 'POST') {
    $producer = new MailQueueProducer();
    $taskId = $producer->createMailTask(
        $_POST['email'],
        $_POST['subject'],
        $_POST['message']
    );
    
    echo json_encode([
        'success' => true,
        'message' => '邮件已加入发送队列',
        'task_id' => $taskId
    ]);
    exit;
}
?>
                

3. 队列消费者实现(Consumer.php)


<?php
require 'vendor/autoload.php';

use PredisClient;
use PHPMailerPHPMailerPHPMailer;
use PHPMailerPHPMailerException;

class MailQueueConsumer {
    private $redis;
    private $mailer;
    private $queueName = 'mail_queue';
    private $maxRetries = 3;
    
    public function __construct() {
        // Redis连接
        $this->redis = new Client([
            'scheme' => 'tcp',
            'host'   => '127.0.0.1',
            'port'   => 6379,
            'database' => 0
        ]);
        
        // 初始化PHPMailer
        $this->mailer = new PHPMailer(true);
        $this->configureMailer();
    }
    
    private function configureMailer(): void {
        // SMTP配置(根据实际情况修改)
        $this->mailer->isSMTP();
        $this->mailer->Host = 'smtp.example.com';
        $this->mailer->SMTPAuth = true;
        $this->mailer->Username = 'your_email@example.com';
        $this->mailer->Password = 'your_password';
        $this->mailer->SMTPSecure = PHPMailer::ENCRYPTION_STARTTLS;
        $this->mailer->Port = 587;
        $this->mailer->CharSet = 'UTF-8';
    }
    
    /**
     * 处理单个邮件任务
     */
    private function processTask(array $taskData): bool {
        $taskId = $taskData['id'];
        $mailData = $taskData['data'];
        
        try {
            // 更新任务状态为处理中
            $taskData['status'] = 'processing';
            $taskData['started_at'] = time();
            $this->redis->hset('mail_tasks', $taskId, json_encode($taskData));
            
            // 配置邮件
            $this->mailer->setFrom($mailData['from'], '系统邮件');
            $this->mailer->addAddress($mailData['to']);
            $this->mailer->Subject = $mailData['subject'];
            
            if ($mailData['is_html']) {
                $this->mailer->isHTML(true);
                $this->mailer->Body = $mailData['body'];
                $this->mailer->AltBody = strip_tags($mailData['body']);
            } else {
                $this->mailer->Body = $mailData['body'];
            }
            
            // 发送邮件
            $this->mailer->send();
            
            // 更新任务状态为成功
            $taskData['status'] = 'completed';
            $taskData['completed_at'] = time();
            $this->redis->hset('mail_tasks', $taskId, json_encode($taskData));
            
            $this->log("任务 {$taskId} 发送成功");
            return true;
            
        } catch (Exception $e) {
            // 处理失败,重试逻辑
            $taskData['retries'] = ($taskData['retries'] ?? 0) + 1;
            $taskData['last_error'] = $e->getMessage();
            
            if ($taskData['retries'] maxRetries) {
                // 重新加入队列
                $taskData['status'] = 'pending';
                $this->redis->rpush($this->queueName, json_encode($taskData));
                $this->log("任务 {$taskId} 发送失败,等待重试");
            } else {
                // 超过重试次数,标记为失败
                $taskData['status'] = 'failed';
                $this->redis->hset('mail_tasks', $taskId, json_encode($taskData));
                $this->log("任务 {$taskId} 发送失败,已达最大重试次数");
            }
            
            return false;
        }
    }
    
    /**
     * 启动消费者守护进程
     */
    public function startConsuming(int $sleepTime = 1): void {
        $this->log("邮件队列消费者启动...");
        
        while (true) {
            try {
                // 使用BLPOP阻塞式获取任务(等待10秒)
                $result = $this->redis->blpop([$this->queueName], 10);
                
                if ($result) {
                    $taskJson = $result[1];
                    $taskData = json_decode($taskJson, true);
                    
                    if ($taskData) {
                        $this->processTask($taskData);
                    }
                }
                
                // 防止CPU占用过高
                if ($sleepTime > 0) {
                    usleep($sleepTime * 1000000);
                }
                
            } catch (Exception $e) {
                $this->log("消费者异常: " . $e->getMessage());
                sleep(5); // 发生异常时等待5秒再继续
            }
        }
    }
    
    private function log(string $message): void {
        $timestamp = date('Y-m-d H:i:s');
        echo "[{$timestamp}] {$message}n";
        
        // 同时写入文件日志
        file_put_contents(
            'mail_queue.log',
            "[{$timestamp}] {$message}n",
            FILE_APPEND
        );
    }
}

// 启动消费者(命令行执行)
if (php_sapi_name() === 'cli') {
    $consumer = new MailQueueConsumer();
    $consumer->startConsuming();
} else {
    die("此脚本仅支持命令行执行");
}
?>
                

4. 监控面板实现(Monitor.php)


<?php
require 'vendor/autoload.php';

use PredisClient;

class QueueMonitor {
    private $redis;
    
    public function __construct() {
        $this->redis = new Client();
    }
    
    public function getQueueStats(): array {
        $queueName = 'mail_queue';
        
        return [
            'queue_length' => $this->redis->llen($queueName),
            'pending_tasks' => $this->countTasksByStatus('pending'),
            'processing_tasks' => $this->countTasksByStatus('processing'),
            'completed_tasks' => $this->countTasksByStatus('completed'),
            'failed_tasks' => $this->countTasksByStatus('failed'),
            'total_tasks' => $this->redis->hlen('mail_tasks'),
            'memory_usage' => $this->getMemoryUsage()
        ];
    }
    
    private function countTasksByStatus(string $status): int {
        $count = 0;
        $tasks = $this->redis->hgetall('mail_tasks');
        
        foreach ($tasks as $taskJson) {
            $task = json_decode($taskJson, true);
            if ($task['status'] === $status) {
                $count++;
            }
        }
        
        return $count;
    }
    
    private function getMemoryUsage(): string {
        $info = $this->redis->info('memory');
        return round($info['used_memory'] / 1024 / 1024, 2) . ' MB';
    }
}

// Web监控界面
$monitor = new QueueMonitor();
$stats = $monitor->getQueueStats();
?>

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>邮件队列监控面板</title>
</head>
<body>
    <h1>邮件队列实时监控</h1>
    <div>
        <h2>队列统计</h2>
        <ul>
            <li>队列长度: <strong><?= $stats['queue_length'] ?></strong></li>
            <li>等待中任务: <strong><?= $stats['pending_tasks'] ?></strong></li>
            <li>处理中任务: <strong><?= $stats['processing_tasks'] ?></strong></li>
            <li>已完成任务: <strong><?= $stats['completed_tasks'] ?></strong></li>
            <li>失败任务: <strong><?= $stats['failed_tasks'] ?></strong></li>
            <li>总任务数: <strong><?= $stats['total_tasks'] ?></strong></li>
            <li>内存使用: <strong><?= $stats['memory_usage'] ?></strong></li>
        </ul>
    </div>
    
    <div>
        <h2>快速操作</h2>
        <button onclick="location.reload()">刷新数据</button>
        <button onclick="clearCompleted()">清理已完成任务</button>
    </div>
    
    <script>
    function clearCompleted() {
        if (confirm('确定要清理已完成的任务吗?')) {
            fetch('clear_tasks.php')
                .then(response => response.json())
                .then(data => {
                    alert(data.message);
                    location.reload();
                });
        }
    }
    
    // 自动刷新(每30秒)
    setInterval(() => {
        location.reload();
    }, 30000);
    </script>
</body>
</html>
                

四、部署与监控

1. Supervisor配置


# /etc/supervisor/conf.d/mail_queue.conf
[program:mail_queue_worker]
command=php /path/to/Consumer.php
process_name=%(program_name)s_%(process_num)02d
numprocs=4  # 启动4个消费者进程
directory=/path/to/your/project
autostart=true
autorestart=true
user=www-data
redirect_stderr=true
stdout_logfile=/var/log/mail_queue.log
stdout_logfile_maxbytes=10MB
stdout_logfile_backups=5
                

2. 启动与管理命令


# 重载Supervisor配置
sudo supervisorctl reread
sudo supervisorctl update

# 启动队列工作者
sudo supervisorctl start mail_queue_worker:*

# 查看状态
sudo supervisorctl status

# 重启所有工作者
sudo supervisorctl restart mail_queue_worker:*

# 查看日志
tail -f /var/log/mail_queue.log
                

3. 性能监控指标

  • 队列积压:监控队列长度,超过阈值报警
  • 处理速度:统计每分钟处理的任务数
  • 失败率:失败任务占总任务的比例
  • 内存使用:Redis内存占用情况
  • 延迟时间:任务从创建到开始处理的时间差

五、性能优化策略

1. 连接池优化


// 使用连接池减少Redis连接开销
class RedisPool {
    private static $instances = [];
    private $pool;
    
    public static function getInstance($configKey = 'default') {
        if (!isset(self::$instances[$configKey])) {
            self::$instances[$configKey] = new self($configKey);
        }
        return self::$instances[$configKey];
    }
    
    private function __construct($configKey) {
        $this->pool = new SplQueue();
        $config = $this->getConfig($configKey);
        
        // 初始化连接池
        for ($i = 0; $i pool->enqueue(new Client($config));
        }
    }
    
    public function getConnection() {
        if ($this->pool->count() > 0) {
            return $this->pool->dequeue();
        }
        return new Client($this->getConfig());
    }
    
    public function releaseConnection($connection) {
        $this->pool->enqueue($connection);
    }
}
                

2. 批量处理优化


// 批量获取和处理任务
public function batchProcess(int $batchSize = 10): void {
    $tasks = [];
    
    // 批量获取任务
    for ($i = 0; $i redis->lpop($this->queueName);
        if ($result) {
            $tasks[] = json_decode($result, true);
        } else {
            break;
        }
    }
    
    if (empty($tasks)) {
        return;
    }
    
    // 批量处理
    foreach ($tasks as $task) {
        $this->processTask($task);
    }
    
    // 记录批量处理结果
    $this->log("批量处理完成,共处理 " . count($tasks) . " 个任务");
}
                

3. 故障恢复机制

  • 死信队列:将多次失败的任务转移到死信队列
  • 数据持久化:定期将Redis数据备份到磁盘
  • 优雅关闭:收到停止信号时完成当前任务再退出
  • 健康检查:实现HTTP健康检查端点

4. 扩展建议

  1. 多队列优先级:实现高、中、低优先级队列
  2. 延迟队列:使用Redis的Sorted Set实现延迟任务
  3. 集群部署:Redis集群提高可用性和性能
  4. 可视化监控:集成Grafana展示队列指标
  5. 自动扩缩容:根据队列长度动态调整Worker数量

总结

本文详细介绍了基于PHP和Redis构建异步任务队列的完整方案。通过生产者-消费者模式,实现了邮件发送的异步处理,显著提升了系统响应速度。系统具备以下特点:

  • 高性能:Redis内存操作,支持高并发任务处理
  • 高可靠:任务持久化、重试机制、故障恢复
  • 易监控:完整的监控面板和日志系统
  • 可扩展:支持水平扩展和多进程处理
  • 生产就绪:包含部署方案和性能优化建议

此方案不仅适用于邮件发送,稍作修改即可应用于短信通知、图片处理、数据同步等各种异步场景,是构建高性能PHP应用的必备技术。

PHP异步任务队列实战:基于Redis实现高性能邮件发送系统 | 后端开发教程
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 php PHP异步任务队列实战:基于Redis实现高性能邮件发送系统 | 后端开发教程 https://www.taomawang.com/server/php/1482.html

下一篇:

已经没有下一篇了!

常见问题

相关文章

猜你喜欢
发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务