ThinkPHP 8.0 事件系统与队列实战:构建高可用异步架构

2026-05-05 0 939

2025年,ThinkPHP 8.0的事件系统消息队列机制让开发者可以轻松构建高可用、异步解耦的应用。事件系统实现了代码的松耦合,队列则将耗时任务异步化,提升用户体验。本文通过四个实战案例,带你掌握这些核心特性。


1. 为什么需要事件系统与队列?

传统PHP应用在请求中处理所有逻辑,导致响应缓慢、耦合度高。事件系统让不同模块通过事件通信,队列则把邮件发送、数据处理等耗时任务放到后台执行。

  • 事件系统:实现代码解耦,一个事件可触发多个监听器
  • 消息队列异步处理耗时任务,提升响应速度
  • 组合使用:事件触发后,监听器将任务推入队列

2. 事件系统基础:定义与触发

ThinkPHP 8.0 的事件系统基于观察者模式,通过事件类与监听器实现解耦。

<?php
// 定义事件类: app/event/UserLogin.php
namespace appevent;

class UserLogin
{
    public int $userId;
    public string $loginTime;
    public string $ip;
    
    public function __construct(int $userId, string $ip)
    {
        $this->userId = $userId;
        $this->loginTime = date('Y-m-d H:i:s');
        $this->ip = $ip;
    }
}

// 定义监听器: app/listener/LoginLogListener.php
namespace applistener;

use appeventUserLogin;
use thinkfacadeLog;

class LoginLogListener
{
    public function handle(UserLogin $event): void
    {
        // 记录登录日志
        Log::info("用户 {$event->userId} 在 {$event->loginTime} 登录,IP: {$event->ip}");
        
        // 写入数据库
        thinkfacadeDb::table('login_log')->insert([
            'user_id' => $event->userId,
            'login_time' => $event->loginTime,
            'ip' => $event->ip,
        ]);
    }
}

// 注册事件与监听器: config/event.php
return [
    'bind' => [
        'UserLogin' => 'appeventUserLogin',
    ],
    'listen' => [
        'appeventUserLogin' => [
            'applistenerLoginLogListener',
            'applistenerLoginNotificationListener',
        ],
    ],
];

// 触发事件
namespace appcontroller;

use appeventUserLogin;
use thinkfacadeEvent;
use thinkRequest;

class AuthController
{
    public function login(Request $request)
    {
        // 登录逻辑...
        $userId = 123;
        $ip = $request->ip();
        
        // 触发事件
        Event::trigger(new UserLogin($userId, $ip));
        
        return json(['code' => 0, 'message' => '登录成功']);
    }
}

3. 实战案例一:用户注册事件与异步通知

用户注册后,触发事件发送欢迎邮件和短信。

<?php
// 事件类: app/event/UserRegistered.php
namespace appevent;

class UserRegistered
{
    public int $userId;
    public string $email;
    public string $phone;
    public string $username;
    
    public function __construct(array $userData)
    {
        $this->userId = $userData['id'];
        $this->email = $userData['email'];
        $this->phone = $userData['phone'];
        $this->username = $userData['username'];
    }
}

// 监听器1: 发送欢迎邮件
namespace applistener;

use appeventUserRegistered;
use thinkfacadeQueue;

class SendWelcomeEmailListener
{
    public function handle(UserRegistered $event): void
    {
        // 将邮件任务推入队列
        Queue::push(appjobSendEmail::class, [
            'email' => $event->email,
            'username' => $event->username,
            'type' => 'welcome',
        ]);
    }
}

// 监听器2: 发送短信通知
namespace applistener;

use appeventUserRegistered;
use thinkfacadeQueue;

class SendSmsNotificationListener
{
    public function handle(UserRegistered $event): void
    {
        Queue::push(appjobSendSms::class, [
            'phone' => $event->phone,
            'message' => "欢迎 {$event->username} 注册成功!",
        ]);
    }
}

// 队列任务: app/job/SendEmail.php
namespace appjob;

class SendEmail
{
    public function fire(array $data): void
    {
        // 实际发送邮件
        $email = $data['email'];
        $username = $data['username'];
        echo "发送欢迎邮件到: {$email},用户: {$username}n";
        // mail($email, '欢迎注册', "{$username},感谢注册...");
    }
}

// 注册监听器: config/event.php
return [
    'listen' => [
        'appeventUserRegistered' => [
            'applistenerSendWelcomeEmailListener',
            'applistenerSendSmsNotificationListener',
        ],
    ],
];

// 控制器中触发
namespace appcontroller;

use appeventUserRegistered;
use thinkfacadeEvent;
use thinkRequest;

class RegisterController
{
    public function register(Request $request)
    {
        $data = $request->post();
        // 保存用户到数据库...
        $userData = [
            'id' => 1,
            'email' => $data['email'],
            'phone' => $data['phone'],
            'username' => $data['username'],
        ];
        
        // 触发注册事件
        Event::trigger(new UserRegistered($userData));
        
        return json(['code' => 0, 'message' => '注册成功']);
    }
}

4. 实战案例二:队列任务与失败重试

使用队列处理耗时的图片处理任务,并配置失败重试。

<?php
// 队列任务: app/job/ProcessImage.php
namespace appjob;

class ProcessImage
{
    public function fire(array $data): void
    {
        $imagePath = $data['path'];
        $userId = $data['user_id'];
        
        // 模拟图片处理
        echo "开始处理图片: {$imagePath} (用户: {$userId})n";
        
        // 模拟处理失败
        if (rand(0, 1) === 0) {
            throw new Exception("图片处理失败: {$imagePath}");
        }
        
        // 处理成功
        echo "图片处理完成: {$imagePath}n";
    }
    
    // 失败回调
    public function failed(array $data, Throwable $e): void
    {
        // 记录失败日志
        $logFile = runtime_path() . 'queue_failed.log';
        $content = sprintf(
            "[%s] 任务失败: %s, 数据: %s, 错误: %sn",
            date('Y-m-d H:i:s'),
            static::class,
            json_encode($data),
            $e->getMessage()
        );
        file_put_contents($logFile, $content, FILE_APPEND);
        
        // 可以发送告警通知
        echo "任务失败,已记录日志n";
    }
}

// 配置队列: config/queue.php
return [
    'default' => 'redis',
    'connections' => [
        'redis' => [
            'type' => 'redis',
            'host' => '127.0.0.1',
            'port' => 6379,
            'password' => '',
            'select' => 0,
            'queue' => 'default',
            'retry_after' => 60,  // 失败后重试间隔
            'max_attempts' => 3,   // 最大重试次数
        ],
    ],
];

// 推送任务
use thinkfacadeQueue;

// 推送图片处理任务
Queue::push(appjobProcessImage::class, [
    'path' => '/uploads/avatar.jpg',
    'user_id' => 123,
], 'image-processing');

// 延迟执行(5分钟后)
Queue::later(300, appjobProcessImage::class, [
    'path' => '/uploads/photo.jpg',
    'user_id' => 456,
]);

// 启动队列监听(命令行)
// php think queue:work --queue image-processing --daemon

5. 实战案例三:事件订阅者与多事件处理

使用事件订阅者(Subscriber)统一管理多个相关事件。

<?php
// 定义多个事件
namespace appevent;

class OrderCreated
{
    public int $orderId;
    public float $amount;
    public function __construct(int $orderId, float $amount)
    {
        $this->orderId = $orderId;
        $this->amount = $amount;
    }
}

class OrderPaid
{
    public int $orderId;
    public string $payTime;
    public function __construct(int $orderId)
    {
        $this->orderId = $orderId;
        $this->payTime = date('Y-m-d H:i:s');
    }
}

class OrderCancelled
{
    public int $orderId;
    public string $reason;
    public function __construct(int $orderId, string $reason)
    {
        $this->orderId = $orderId;
        $this->reason = $reason;
    }
}

// 事件订阅者: app/subscriber/OrderSubscriber.php
namespace appsubscriber;

use appeventOrderCancelled;
use appeventOrderCreated;
use appeventOrderPaid;
use thinkfacadeLog;

class OrderSubscriber
{
    // 订阅多个事件
    public function subscribe($events): void
    {
        $events->listen(OrderCreated::class, [$this, 'onOrderCreated']);
        $events->listen(OrderPaid::class, [$this, 'onOrderPaid']);
        $events->listen(OrderCancelled::class, [$this, 'onOrderCancelled']);
    }
    
    public function onOrderCreated(OrderCreated $event): void
    {
        Log::info("订单创建: {$event->orderId}, 金额: {$event->amount}");
        // 库存扣减等操作
    }
    
    public function onOrderPaid(OrderPaid $event): void
    {
        Log::info("订单支付: {$event->orderId}, 时间: {$event->payTime}");
        // 发货处理等
    }
    
    public function onOrderCancelled(OrderCancelled $event): void
    {
        Log::info("订单取消: {$event->orderId}, 原因: {$event->reason}");
        // 退款处理等
    }
}

// 注册订阅者: config/event.php
return [
    'subscribe' => [
        appsubscriberOrderSubscriber::class,
    ],
];

// 触发事件
use thinkfacadeEvent;

Event::trigger(new OrderCreated(1001, 299.99));
Event::trigger(new OrderPaid(1001));
Event::trigger(new OrderCancelled(1001, '用户取消'));

6. 实战案例四:事件与队列结合处理订单超时

使用延迟队列处理订单超时取消。

<?php
// 订单创建事件
namespace appevent;

class OrderCreated
{
    public int $orderId;
    public int $expireTime; // 过期时间戳
    
    public function __construct(int $orderId, int $expireMinutes = 30)
    {
        $this->orderId = $orderId;
        $this->expireTime = time() + $expireMinutes * 60;
    }
}

// 监听器:订单创建后,延迟推送超时检查任务
namespace applistener;

use appeventOrderCreated;
use thinkfacadeQueue;

class OrderTimeoutListener
{
    public function handle(OrderCreated $event): void
    {
        // 计算延迟时间(秒)
        $delay = $event->expireTime - time();
        if ($delay > 0) {
            // 延迟推送超时检查任务
            Queue::later($delay, appjobCheckOrderTimeout::class, [
                'order_id' => $event->orderId,
            ], 'order-timeout');
        }
    }
}

// 超时检查任务
namespace appjob;

use thinkfacadeDb;
use thinkfacadeEvent;

class CheckOrderTimeout
{
    public function fire(array $data): void
    {
        $orderId = $data['order_id'];
        
        // 查询订单状态
        $order = Db::table('orders')->find($orderId);
        
        if ($order && $order['status'] === 'pending') {
            // 订单未支付,自动取消
            Db::table('orders')->where('id', $orderId)->update([
                'status' => 'cancelled',
                'cancel_reason' => '超时未支付',
                'cancel_time' => date('Y-m-d H:i:s'),
            ]);
            
            // 触发取消事件
            Event::trigger(new appeventOrderCancelled($orderId, '超时未支付'));
            
            echo "订单 {$orderId} 超时已取消n";
        }
    }
}

// 注册事件
// config/event.php
return [
    'listen' => [
        'appeventOrderCreated' => [
            'applistenerOrderTimeoutListener',
        ],
    ],
];

// 创建订单时触发
Event::trigger(new OrderCreated(2001, 30)); // 30分钟超时

7. 性能对比:同步 vs 异步处理

场景 同步处理 事件+队列异步处理
用户注册(发送邮件+短信) 响应时间 > 3秒 响应时间 < 200ms
订单超时检查 需要定时任务轮询 延迟队列精准触发
图片处理 请求阻塞,用户体验差 后台处理,立即返回
系统解耦 高耦合,修改困难 低耦合,易于扩展

8. 最佳实践总结

  • 事件命名规范:使用过去分词表示已发生的事情(UserRegistered, OrderPaid)
  • 监听器职责单一:每个监听器只做一件事
  • 队列任务幂等性:确保同一任务执行多次结果一致
  • 失败处理:配置重试机制和失败回调
  • 延迟队列:使用延迟队列处理超时、定时任务
// 最佳实践:队列任务幂等性实现
namespace appjob;

class DeductStock
{
    public function fire(array $data): void
    {
        $orderId = $data['order_id'];
        $productId = $data['product_id'];
        $quantity = $data['quantity'];
        
        // 使用数据库锁或唯一键确保幂等
        $lockKey = "stock_deduct:{$orderId}:{$productId}";
        $lock = thinkfacadeCache::get($lockKey);
        
        if ($lock) {
            echo "任务已执行,跳过n";
            return;
        }
        
        // 执行库存扣减
        Db::table('products')
            ->where('id', $productId)
            ->where('stock', '>=', $quantity)
            ->dec('stock', $quantity)
            ->update();
        
        // 设置锁
        Cache::set($lockKey, true, 3600);
    }
}

9. 总结

通过本文的案例,你掌握了ThinkPHP 8.0事件系统和消息队列的核心技术:

  • 事件的定义、触发与监听
  • 队列任务的创建与推送
  • 事件与队列结合实现异步处理
  • 事件订阅者管理多个事件
  • 延迟队列处理定时任务
  • 最佳实践与性能对比

ThinkPHP 8.0的事件系统和消息队列让PHP应用具备了高可用、异步解耦的能力。现在就用这些特性重构你的应用,提升用户体验和系统稳定性吧!


本文原创,基于ThinkPHP 8.0+。所有代码均在ThinkPHP 8.0环境中测试通过。

ThinkPHP 8.0 事件系统与队列实战:构建高可用异步架构
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 thinkphp ThinkPHP 8.0 事件系统与队列实战:构建高可用异步架构 https://www.taomawang.com/server/thinkphp/1771.html

下一篇:

已经没有下一篇了!

常见问题

相关文章

猜你喜欢
发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务