电商系统里有一个很常见的场景:用户下单后如果30分钟内没有支付,订单就需要自动取消,释放库存。以前的做法是写一个定时任务每分钟扫一遍数据库,找出所有超时订单然后批量处理。这种轮询方式在订单量小的时候没问题,但数据量一大,每分钟的全表扫描就成了数据库的负担,而且延迟最差能到一分钟——用户体验不好。
更合理的方案是用消息队列的延迟任务:订单创建时往队列里投递一个30分钟后执行的任务,时间一到自动触发取消逻辑。ThinkPHP 8.0内置的队列组件配合Redis就能完整实现这套机制。这篇文章把整个落地过程拆开讲清楚。
一、ThinkPHP 8.0队列的安装和配置
ThinkPHP 8.0的队列功能由topthink/think-queue扩展包提供。先安装:
composer require topthink/think-queue
安装完成后,队列的配置文件位于config/queue.php。默认内容大致如下,我们需要把驱动改成Redis:
<?php
return [
'default' => 'redis',
'connections' => [
'sync' => [
'type' => 'sync',
],
'database' => [
'type' => 'database',
'queue' => 'default',
'table' => 'jobs',
'connection' => null,
],
'redis' => [
'type' => 'redis',
'queue' => 'default',
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'select' => 0,
'timeout' => 0,
'persistent' => false,
],
],
'failed' => [
'type' => 'none',
'table' => 'failed_jobs',
],
];
确保default设为redis。测试环境或小任务可用sync同步执行,方便调试。确认Redis服务已经启动并能正常连接。
二、创建第一个Job类
在ThinkPHP中,队列任务被封装成Job类。我们用命令行快速生成一个用于取消订单的Job:
php think make:job CancelUnpaidOrder
这会在app/job/CancelUnpaidOrder.php生成一个基础骨架。我们把它改造一下:
<?php
declare(strict_types=1);
namespace appjob;
use thinkfacadeDb;
use thinkfacadeLog;
use thinkqueueJob;
class CancelUnpaidOrder
{
/**
* 队列任务执行入口
* @param Job $job 当前任务对象
* @param array $data 投递时传入的数据
*/
public function fire(Job $job, array $data): void
{
$orderId = $data['order_id'] ?? 0;
if (empty($orderId)) {
// 数据无效,直接删除任务
$job->delete();
return;
}
// 查询当前订单状态
$order = Db::table('orders')->where('order_id', $orderId)->find();
if (!$order) {
$job->delete();
return;
}
// 只处理未支付状态
if ($order['status'] !== 'unpaid') {
$job->delete();
return;
}
// 检查是否确实超时(二次确认)
$createdAt = strtotime($order['created_at']);
$timeoutMinutes = 30;
if (time() - $createdAt release($timeoutMinutes * 60 - (time() - $createdAt));
return;
}
// 开启事务处理取消
Db::transaction(function () use ($order) {
// 更新订单状态为已取消
Db::table('orders')->where('order_id', $order['order_id'])->update([
'status' => 'cancelled',
'cancel_time' => date('Y-m-d H:i:s'),
'cancel_reason' => '支付超时自动取消',
]);
// 释放库存
Db::table('products')->where('id', $order['product_id'])->inc('stock', $order['quantity'])->update();
Log::info("订单 {$order['order_id']} 支付超时,已自动取消");
});
// 任务执行成功,删除
$job->delete();
}
/**
* 任务执行失败时的处理
* @param array $data 任务数据
*/
public function failed(array $data): void
{
Log::error("取消订单任务失败,订单ID: {$data['order_id']}");
}
}
几个关键点说明:
fire方法是任务入口,接收两个参数:$job代表当前队列任务,$data是投递时传入的数据。- 任务内部做了多重校验:订单是否存在、状态是否正确、超时时间是否真的到了。这是为了防止重复执行或者触发时机偏差导致错误取消。
$job->delete()表示任务执行完成,从队列中移除。$job->release($seconds)可以把任务放回队列并在指定秒数后重试,这里用于时间未到的情况。failed方法在任务达到最大重试次数后执行,用来记录故障日志。
三、在订单创建时投递延迟任务
当用户下单成功后,我们需要立即投递一个30分钟后执行的取消任务。业务层代码大致如下:
<?php
namespace appservice;
use thinkfacadeQueue;
use thinkfacadeDb;
class OrderService
{
public function createOrder(array $orderData): int
{
// 写入订单数据,状态为unpaid
$orderId = Db::table('orders')->insertGetId([
'order_no' => $this->generateOrderNo(),
'user_id' => $orderData['user_id'],
'product_id' => $orderData['product_id'],
'quantity' => $orderData['quantity'],
'amount' => $orderData['amount'],
'status' => 'unpaid',
'created_at' => date('Y-m-d H:i:s'),
]);
// 扣减库存(此处简略处理)
Db::table('products')->where('id', $orderData['product_id'])->dec('stock', $orderData['quantity'])->update();
// 投递延迟队列,30分钟后执行取消任务
Queue::later(30 * 60, appjobCancelUnpaidOrder::class, [
'order_id' => $orderId,
], 'order_cancel');
return $orderId;
}
private function generateOrderNo(): string
{
return date('YmdHis') . str_pad(rand(0, 9999), 4, '0', STR_PAD_LEFT);
}
}
这里Queue::later的四个参数分别是:延迟秒数、Job类名、投递数据、队列名称。队列名称order_cancel用来将任务放到指定的队列中,方便后续按队列启动消费进程。
四、启动队列消费进程
任务投递出去了,还需要一个监工来执行。ThinkPHP提供了命令行指令来启动队列消费进程:
php think queue:listen --queue order_cancel
或者使用queue:work,两者区别:
queue:listen会监听队列,每次有新任务就重新加载框架。适合开发环境,代码更新后无需重启。queue:work启动后常驻内存,不会重新加载框架。性能更高,但代码更新后需要重启进程。生产环境推荐使用,并通过Supervisor等工具守护进程。
生产环境下的典型启动命令:
php think queue:work redis --queue order_cancel --daemon
配置Supervisor的示例:
[program:thinkphp-order-cancel]
command=php /path/to/your/project/think queue:work redis --queue order_cancel --daemon
directory=/path/to/your/project
autostart=true
autorestart=true
user=www
numprocs=2
process_name=%(program_name)s_%(process_num)02d
stdout_logfile=/var/log/supervisor/order_cancel_stdout.log
stderr_logfile=/var/log/supervisor/order_cancel_stderr.log
这样进程崩溃后Supervisor会自动重启,确保任务不会堆积。
五、处理用户支付和重复投递的问题
如果用户在30分钟内支付了,订单状态会变成paid。之前的Job逻辑里已经处理了这种情况:当任务执行时发现订单状态不是unpaid,就直接删除任务。这里没毛病。
但有个边界场景:用户在第29分钟支付,网络延迟导致支付回调在第31分钟才到达。此时取消任务可能已经在第30分钟执行了,就会把刚支付成功的订单给取消掉。解决方法是支付回调里也要做并发控制。
简单方案是在取消任务的SQL里加上WHERE status='unpaid'条件,并且使用乐观锁。上面已经在Db::transaction里处理了查询和更新,但更稳妥的是带上版本号或限制条件。实际改进:
$affected = Db::table('orders')
->where('order_id', $order['order_id'])
->where('status', 'unpaid')
->update([
'status' => 'cancelled',
'cancel_time' => date('Y-m-d H:i:s'),
]);
if ($affected === 0) {
// 订单状态已改变,任务结束
$job->delete();
return;
}
支付回调里也要做类似的乐观锁更新,保证只有第一个成功的更新生效。
六、失败任务和重试机制
ThinkPHP队列默认提供了失败重试。Job类里可以配置$maxAttempts属性来设置最大重试次数,以及$retryDelay属性设置重试间隔。也可以在投递任务时指定:
Queue::later(1800, appjobCancelUnpaidOrder::class, ['order_id' => $orderId], 'order_cancel', 3);
最后的参数3表示最多尝试3次。如果所有尝试都失败,会调用Job类的failed方法。
另外,建议开启失败任务记录。修改config/queue.php中failed配置:
'failed' => [
'type' => 'table',
'table' => 'failed_jobs',
],
然后创建记录表:
CREATE TABLE `failed_jobs` (
`id` bigint unsigned auto_increment,
`connection` text not null,
`queue` text not null,
`payload` longtext not null,
`exception` longtext null,
`failed_at` timestamp default current_timestamp,
primary key (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
这样失败的任务会被记录下来,后续可以在后台查看和手动重试。
七、队列监控的简单实现
队列跑起来了,但你还得知道它有没有积压、失败率多少。最简单的监控方式是通过Redis命令查看队列长度:
redis-cli LLEN queues:order_cancel
也可以在ThinkPHP里写一个简单的监控接口:
public function queueStatus()
{
$redis = Cache::store('redis')->handler();
$queueNames = ['order_cancel', 'default'];
$status = [];
foreach ($queueNames as $name) {
$status[$name] = [
'waiting' => $redis->lLen('queues:' . $name),
'delayed' => $redis->zCard('queues:' . $name . ':delayed'),
'failed' => Db::table('failed_jobs')->count(),
];
}
return json($status);
}
配合定时检查这个接口并设置告警阈值,就可以及时知道队列健康状态。
八、总结
用ThinkPHP 8.0的Redis队列处理订单超时取消,比传统的轮询方式更实时、更省数据库资源。整个方案的核心就三步:安装配置队列驱动、创建Job类、在业务节点投递延迟任务。配合Supervisor守护进程和失败记录,可以稳定地在生产环境运行。
除了订单超时,这种延迟任务的模式还适用于会员到期提醒、预约提前通知、优惠券过期处理等场景。把耗时的、需要延迟执行的逻辑从主业务流程中剥离出来丢进队列,控制器的响应速度会明显提升,系统的可伸缩性也更强。

