在现代Web应用中,发送邮件、生成报表、同步数据等耗时操作如果同步执行,会严重拖慢接口响应速度,降低用户体验。ThinkPHP 8.0内置的队列系统提供了完美的异步任务解决方案,支持多种驱动(Redis、Database、RabbitMQ等),并且能与事件系统无缝整合。本文将带你从零开始,通过一个用户注册后异步发送欢迎邮件和短信的完整案例,深入掌握队列的配置、任务创建、延迟执行、任务链以及失败处理等核心技能。
一、队列系统的基础架构与驱动选择
ThinkPHP 8.0的队列组件通过 `topthink/think-queue` 扩展提供,它抽象了任务的生产、分发与消费过程。核心概念包括:
- 任务类 (Job):需要异步执行的具体业务逻辑,通常继承 `thinkqueueJob` 基类或实现相应接口。
- 队列连接 (Connection):任务的存储与分发通道,支持Redis、Database、Sync(同步)等驱动。
- 队列监听器 (Worker/Listener):消费队列中的任务并执行,通过命令行启动。
选择驱动时,Redis因其高性能和丰富的数据结构成为生产环境首选;Database驱动无需额外服务,适合小型项目;Sync驱动则用于开发调试。本文将使用Redis驱动进行讲解,确保具备最佳性能表现。
二、环境准备与队列配置
首先通过Composer安装队列扩展:
composer require topthink/think-queue
安装完成后,在项目配置文件 `config/queue.php` 中配置Redis连接信息:
<?php
return [
'default' => 'redis',
'connections' => [
'sync' => [
'type' => 'sync',
],
'database' => [
'type' => 'database',
'table' => 'jobs',
'queue' => 'default',
'retry_after' => 90,
],
'redis' => [
'type' => 'redis',
'queue' => 'default',
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'select' => 0,
'timeout' => 0,
'persistent' => false,
],
],
'failed' => [
'type' => 'database',
'table' => 'failed_jobs',
],
];
同时,为了记录失败任务,需要在数据库中创建失败任务表(使用Database驱动时):
CREATE TABLE `failed_jobs` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`connection` varchar(50) NOT NULL,
`queue` varchar(100) NOT NULL,
`payload` longtext NOT NULL,
`exception` longtext,
`failed_at` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
三、创建任务类与基础使用方法
任务类是队列执行的载体。下面创建一个发送欢迎邮件的任务 `SendWelcomeEmail`:
<?php
namespace appjob;
use thinkqueueJob;
class SendWelcomeEmail
{
/**
* 任务执行入口
* @param Job $job 当前任务实例
* @param array $data 自定义数据
*/
public function fire(Job $job, array $data)
{
// 获取传递的数据
$email = $data['email'];
$username = $data['username'];
try {
// 模拟发送邮件逻辑
// mail($email, '欢迎注册', "亲爱的{$username},感谢注册...");
// 任务执行成功删除任务
$job->delete();
// 可选:记录日志
trace("欢迎邮件已发送至 {$email}", 'info');
} catch (Exception $e) {
// 执行失败处理
if ($job->attempts() > 3) {
// 超过尝试次数,标记为失败并删除
$job->delete();
// 可将失败信息记录到 failed_jobs 表
$job->failed($e);
} else {
// 稍后重试(默认延迟10秒)
$job->release(10);
}
}
}
/**
* 任务失败回调(当任务被标记为失败时调用)
* @param array $data
*/
public function failed(array $data)
{
// 可在此发送警报通知开发者
trace("欢迎邮件任务最终失败,用户邮箱:{$data['email']}", 'error');
}
}
在控制器中分发该任务:
use thinkfacadeQueue;
// 用户注册成功后
$jobData = [
'email' => 'user@example.com',
'username' => '新用户',
];
// 推送到队列,第二个参数为队列名称(可选)
Queue::push(SendWelcomeEmail::class, $jobData, 'email_queue');
最后,启动队列监听器开始消费任务:
php think queue:listen --queue email_queue
执行该命令后,终端会持续监听 `email_queue` 队列,一旦有新任务立即执行。
四、延迟队列与任务链的高级应用
许多业务场景需要任务在一定时间后执行,例如用户注册15分钟后未激活发送提醒。这时可以使用延迟队列:
// 延迟15分钟执行
Queue::later(900, SendActivationReminder::class, $data, 'reminder_queue');
如果需要一系列任务按顺序执行,可使用任务链。例如注册后依次发送邮件、短信、并同步到CRM:
use thinkqueueQueue;
// 创建任务链
$chain = [
[SendWelcomeEmail::class, ['email' => $email, 'username' => $username]],
[SendWelcomeSms::class, ['mobile' => $mobile]],
[SyncToCrm::class, ['user_id' => $userId]],
];
Queue::chain($chain)->push();
任务链保证前一个任务执行成功后才执行下一个,若中间任务失败,默认会停止后续执行。
五、事件系统与队列的完美整合
ThinkPHP 8.0的事件系统可以进一步优化架构。通过事件解耦注册逻辑与后续异步操作,让代码更易维护。我们定义 `UserRegistered` 事件:
<?php
namespace appevent;
class UserRegistered
{
public $userId;
public $email;
public $mobile;
public function __construct($userId, $email, $mobile)
{
$this->userId = $userId;
$this->email = $email;
$this->mobile = $mobile;
}
}
接着创建事件监听器,在监听器中分发队列任务:
<?php
namespace applistener;
use appeventUserRegistered;
use thinkfacadeQueue;
use appjobSendWelcomeEmail;
use appjobSendWelcomeSms;
class UserRegisteredListener
{
public function handle(UserRegistered $event)
{
// 异步发送邮件
Queue::push(SendWelcomeEmail::class, [
'email' => $event->email,
'username' => '新用户',
], 'email_queue');
// 异步发送短信
Queue::push(SendWelcomeSms::class, [
'mobile' => $event->mobile,
], 'sms_queue');
}
}
在事件定义文件 `event.php` 中绑定事件与监听器:
return [
'bind' => [
'UserRegistered' => appeventUserRegistered::class,
],
'listen' => [
'UserRegistered' => [
applistenerUserRegisteredListener::class,
],
],
];
现在用户注册的业务代码变得极其简洁:
use appeventUserRegistered;
use thinkfacadeEvent;
// 注册逻辑...
$user = User::create([...]);
// 触发事件,后续异步操作由事件系统驱动
Event::trigger(new UserRegistered($user->id, $user->email, $user->mobile));
这种架构下,控制器只负责核心业务流程,异步任务由事件驱动分发到队列,实现了业务与基础设施的彻底解耦。
六、多队列与优先级控制
当系统中有大量不同类型任务时,需要为任务划分队列并分配优先级。ThinkPHP队列监听器支持同时监听多个队列并设置优先级:
php think queue:listen --queue high,email_queue,sms_queue,low
队列名称的排列顺序即为优先级顺序,`high` 队列中的任务会优先被消费。在分发任务时指定队列名称即可:
Queue::push(HighPriorityTask::class, $data, 'high');
Queue::push(LowPriorityTask::class, $data, 'low');
七、失败处理与监控告警
生产环境中必须对失败任务进行有效监控。在 `queue.php` 中配置 `failed` 驱动后,任务在达到最大尝试次数后会自动记录到 `failed_jobs` 表。你可以编写一个定时任务脚本,查询失败记录并发送邮件或钉钉告警:
use thinkfacadeDb;
$failedJobs = Db::table('failed_jobs')
->where('failed_at', '>=', date('Y-m-d H:i:s', strtotime('-1 hour')))
->select();
if (count($failedJobs) > 0) {
// 发送告警通知
(new appserviceAlertService())->send('队列任务失败告警', $failedJobs);
}
另外,可以使用 Supervisor 等工具守护 `queue:listen` 进程,确保消费者持续运行。典型Supervisor配置如下:
[program:thinkphp-queue]
command=/usr/bin/php /var/www/html/think queue:listen --queue high,email_queue,low
directory=/var/www/html
autostart=true
autorestart=true
user=www-data
numprocs=2
redirect_stderr=true
stdout_logfile=/var/log/supervisor/queue_stdout.log
八、性能优化与注意事项
使用队列时有几个关键点需要留意:
- 幂等性:任务可能会被重复执行,确保业务逻辑是幂等的(如发送邮件前检查是否已发送)。
- 内存泄漏:长时间运行的 `queue:listen` 可能会导致内存累积,可改用 `queue:work –once` 配合 cron 或使用 `–max-jobs` 参数限制单进程任务数。
- 失败重试策略:合理设置 `attempts` 和 `release` 延迟,避免雪崩效应。
- 数据库连接:长时间闲置可能导致数据库连接断开,可在任务执行前重连或设置断线重连。
九、总结
通过本文的实战指南,你已经完整掌握了ThinkPHP 8.0队列系统的核心能力:从基本任务推送到延迟队列、任务链,再到与事件系统协同构建优雅的异步架构。本文的案例可直接应用于用户注册、订单处理、报表生成等众多场景。合理利用队列,不仅能显著提升用户体验,还能让代码结构更加清晰、可扩展。立即在你的下一个项目中将耗时操作异步化,感受高性能带来的改变。

