免费资源下载
原创技术教程 | 更新时间:2023年10月
一、异步任务队列的核心价值
在现代Web应用中,同步处理耗时任务会导致请求阻塞、用户体验下降。异步任务队列通过解耦任务触发与执行,实现:
- 提升响应速度:立即返回响应,后台异步处理
- 提高系统吞吐量:避免进程阻塞,充分利用资源
- 实现延迟执行:支持定时任务和延迟任务
- 增强系统可靠性:任务持久化,失败重试机制
本文将构建一个基于Redis的分布式延迟任务系统,支持精确到秒的延迟执行和任务优先级管理。
二、系统架构设计
2.1 核心组件
┌─────────────────┐ ┌──────────────┐ ┌──────────────┐
│ 任务生产者 │───▶│ Redis队列 │───▶│ 任务消费者 │
│ (Producer) │ │ (Sorted Set)│ │ (Consumer) │
└─────────────────┘ └──────────────┘ └──────────────┘
│ │ │
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌──────────────┐ ┌──────────────┐
│ Web应用/API │ │ 延迟队列 │ │ 工作进程 │
│ │ │ (ZSET) │ │ (Worker) │
└─────────────────┘ └──────────────┘ └──────────────┘
2.2 数据结构设计
使用Redis有序集合(ZSET)存储延迟任务:
- Score:任务执行时间戳(Unix timestamp)
- Member:JSON格式的任务数据
- 额外Hash存储:任务元信息和重试次数
三、核心代码实现
3.1 任务队列管理器
<?php
/**
* Redis延迟任务队列管理器
*/
class RedisDelayQueue
{
private $redis;
private $queueKey = 'delay_queue:tasks';
private $metaKeyPrefix = 'delay_queue:meta:';
public function __construct($config = [])
{
$this->redis = new Redis();
$this->redis->connect(
$config['host'] ?? '127.0.0.1',
$config['port'] ?? 6379
);
if (!empty($config['password'])) {
$this->redis->auth($config['password']);
}
if (!empty($config['database'])) {
$this->redis->select($config['database']);
}
}
/**
* 添加延迟任务
* @param string $taskId 任务ID
* @param array $data 任务数据
* @param int $delaySeconds 延迟秒数
* @param int $priority 优先级(1-10)
* @return bool
*/
public function addTask($taskId, $data, $delaySeconds, $priority = 5)
{
$executeTime = time() + $delaySeconds;
$adjustedTime = $executeTime - ($priority - 1);
$task = [
'id' => $taskId,
'data' => $data,
'created_at' => time(),
'priority' => $priority,
'retry_count' => 0
];
// 存储到有序集合
$result = $this->redis->zAdd(
$this->queueKey,
$adjustedTime,
json_encode($task)
);
if ($result) {
// 存储元数据
$metaKey = $this->metaKeyPrefix . $taskId;
$this->redis->hMSet($metaKey, [
'status' => 'pending',
'max_retries' => 3,
'current_retries' => 0
]);
$this->redis->expire($metaKey, 86400 * 7);
}
return $result;
}
/**
* 获取待执行任务
* @return array|null
*/
public function getDueTasks($limit = 10)
{
$now = time();
$tasks = [];
// 获取score小于等于当前时间的任务
$items = $this->redis->zRangeByScore(
$this->queueKey,
0,
$now,
['limit' => [0, $limit]]
);
foreach ($items as $item) {
$task = json_decode($item, true);
if ($task) {
$tasks[] = $task;
// 从有序集合中移除
$this->redis->zRem($this->queueKey, $item);
}
}
return $tasks;
}
/**
* 更新任务状态
*/
public function updateTaskStatus($taskId, $status, $error = null)
{
$metaKey = $this->metaKeyPrefix . $taskId;
$updateData = ['status' => $status];
if ($error) {
$updateData['last_error'] = $error;
$updateData['failed_at'] = time();
}
if ($status === 'processing') {
$updateData['started_at'] = time();
} elseif ($status === 'completed') {
$updateData['completed_at'] = time();
}
return $this->redis->hMSet($metaKey, $updateData);
}
}
?>
3.2 任务处理器基类
<?php
/**
* 抽象任务处理器
*/
abstract class TaskHandler
{
protected $maxRetries = 3;
protected $retryDelay = 60;
abstract public function handle($taskData);
public function failed($taskData, $exception)
{
// 默认失败处理:记录日志
error_log(sprintf(
"Task failed: %s, Error: %s",
json_encode($taskData),
$exception->getMessage()
));
}
public function shouldRetry($retryCount, $exception)
{
return $retryCount < $this->maxRetries;
}
public function getRetryDelay($retryCount)
{
return $this->retryDelay * pow(2, $retryCount);
}
}
/**
* 邮件发送任务处理器
*/
class EmailTaskHandler extends TaskHandler
{
public function handle($taskData)
{
$email = $taskData['email'];
$subject = $taskData['subject'];
$content = $taskData['content'];
// 模拟邮件发送
if (!filter_var($email, FILTER_VALIDATE_EMAIL)) {
throw new InvalidArgumentException("Invalid email address");
}
// 实际发送邮件逻辑
// mail($email, $subject, $content);
return true;
}
public function shouldRetry($retryCount, $exception)
{
// 对于无效邮箱,不重试
if ($exception instanceof InvalidArgumentException) {
return false;
}
return parent::shouldRetry($retryCount, $exception);
}
}
?>
四、高级特性与优化
4.1 分布式工作者实现
<?php
/**
* 分布式任务工作者
*/
class DistributedWorker
{
private $queue;
private $handlers = [];
private $isRunning = false;
private $workerId;
public function __construct(RedisDelayQueue $queue)
{
$this->queue = $queue;
$this->workerId = uniqid('worker_', true);
}
public function registerHandler($taskType, TaskHandler $handler)
{
$this->handlers[$taskType] = $handler;
}
public function start($options = [])
{
$this->isRunning = true;
$sleepTime = $options['sleep'] ?? 1;
pcntl_signal(SIGTERM, [$this, 'shutdown']);
pcntl_signal(SIGINT, [$this, 'shutdown']);
echo "Worker {$this->workerId} startedn";
while ($this->isRunning) {
pcntl_signal_dispatch();
try {
$tasks = $this->queue->getDueTasks(5);
if (empty($tasks)) {
sleep($sleepTime);
continue;
}
foreach ($tasks as $task) {
$this->processTask($task);
}
} catch (Exception $e) {
error_log("Worker error: " . $e->getMessage());
sleep($sleepTime * 2);
}
}
}
private function processTask($task)
{
$taskId = $task['id'];
$taskType = $task['data']['type'] ?? 'default';
$this->queue->updateTaskStatus($taskId, 'processing');
try {
if (!isset($this->handlers[$taskType])) {
throw new RuntimeException("No handler for task type: {$taskType}");
}
$handler = $this->handlers[$taskType];
$result = $handler->handle($task['data']);
$this->queue->updateTaskStatus($taskId, 'completed');
echo "Task {$taskId} completed successfullyn";
} catch (Exception $e) {
$this->handleTaskFailure($task, $e, $handler ?? null);
}
}
private function handleTaskFailure($task, $exception, $handler)
{
$taskId = $task['id'];
$retryCount = $task['retry_count'] ?? 0;
if ($handler && $handler->shouldRetry($retryCount, $exception)) {
$delay = $handler->getRetryDelay($retryCount);
$task['retry_count'] = $retryCount + 1;
$this->queue->addTask(
$taskId . '_retry_' . ($retryCount + 1),
$task['data'],
$delay,
$task['priority']
);
echo "Task {$taskId} scheduled for retry in {$delay} secondsn";
} else {
$this->queue->updateTaskStatus($taskId, 'failed', $exception->getMessage());
if ($handler) {
$handler->failed($task['data'], $exception);
}
echo "Task {$taskId} failed permanently: " . $exception->getMessage() . "n";
}
}
public function shutdown($signal)
{
echo "Worker {$this->workerId} shutting down...n";
$this->isRunning = false;
}
}
?>
五、实战案例:订单超时关闭
5.1 订单服务集成
<?php
/**
* 订单服务 - 集成延迟队列
*/
class OrderService
{
private $delayQueue;
private $orderTimeout = 1800; // 30分钟
public function __construct(RedisDelayQueue $queue)
{
$this->delayQueue = $queue;
}
/**
* 创建订单并设置超时关闭任务
*/
public function createOrder($orderData)
{
$orderId = 'ORDER_' . time() . '_' . uniqid();
// 保存订单到数据库
$this->saveOrderToDatabase($orderId, $orderData);
// 添加延迟任务:30分钟后关闭未支付订单
$taskId = 'order_timeout_' . $orderId;
$taskData = [
'type' => 'order_timeout',
'order_id' => $orderId,
'action' => 'close_unpaid_order'
];
$this->delayQueue->addTask(
$taskId,
$taskData,
$this->orderTimeout,
8 // 较高优先级
);
return $orderId;
}
/**
* 订单支付成功,取消超时任务
*/
public function markOrderPaid($orderId)
{
// 更新订单状态
$this->updateOrderStatus($orderId, 'paid');
// 在实际应用中,这里需要实现任务取消逻辑
// 可以通过存储任务ID到订单关联表,然后删除对应任务
}
}
/**
* 订单超时处理器
*/
class OrderTimeoutHandler extends TaskHandler
{
private $orderService;
public function __construct($orderService)
{
$this->orderService = $orderService;
$this->maxRetries = 2;
}
public function handle($taskData)
{
$orderId = $taskData['order_id'];
// 检查订单状态
$order = $this->orderService->getOrder($orderId);
if (!$order) {
throw new RuntimeException("Order not found: {$orderId}");
}
if ($order['status'] !== 'pending') {
// 订单已处理,无需操作
return true;
}
// 执行订单关闭逻辑
$this->orderService->closeOrder($orderId, 'timeout');
// 发送通知
$this->sendTimeoutNotification($order);
return true;
}
private function sendTimeoutNotification($order)
{
// 发送邮件或短信通知
$message = "您的订单 {$order['order_no']} 因超时未支付已自动关闭";
// 实际发送逻辑...
}
}
?>
5.2 启动脚本
<?php
// worker.php
require_once 'RedisDelayQueue.php';
require_once 'TaskHandler.php';
require_once 'DistributedWorker.php';
require_once 'OrderService.php';
// 配置Redis连接
$redisConfig = [
'host' => '127.0.0.1',
'port' => 6379,
'database' => 1
];
// 初始化队列
$queue = new RedisDelayQueue($redisConfig);
// 初始化工作者
$worker = new DistributedWorker($queue);
// 注册处理器
$orderService = new OrderService($queue);
$worker->registerHandler('order_timeout', new OrderTimeoutHandler($orderService));
$worker->registerHandler('email', new EmailTaskHandler());
// 启动工作者
$worker->start([
'sleep' => 1, // 无任务时休眠秒数
]);
// 命令行启动:php worker.php --name=worker1
?>
六、监控与运维
6.1 监控指标收集
<?php
/**
* 队列监控服务
*/
class QueueMonitor
{
private $queue;
private $metrics = [];
public function __construct(RedisDelayQueue $queue)
{
$this->queue = $queue;
}
public function collectMetrics()
{
$metrics = [
'pending_tasks' => $this->getPendingCount(),
'delayed_tasks' => $this->getDelayedCount(),
'processing_tasks' => $this->getProcessingCount(),
'failed_tasks' => $this->getFailedCount(),
'avg_process_time' => $this->getAverageProcessTime()
];
$this->metrics = array_merge($this->metrics, $metrics);
return $metrics;
}
private function getPendingCount()
{
// 获取待处理任务数量
$now = time();
return $this->queue->getRedis()->zCount(
'delay_queue:tasks',
0,
$now
);
}
public function generateReport()
{
$metrics = $this->collectMetrics();
$report = [
'timestamp' => date('Y-m-d H:i:s'),
'metrics' => $metrics,
'alerts' => $this->checkAlerts($metrics)
];
return $report;
}
private function checkAlerts($metrics)
{
$alerts = [];
if ($metrics['pending_tasks'] > 1000) {
$alerts[] = '警告:待处理任务过多';
}
if ($metrics['failed_tasks'] > 100) {
$alerts[] = '警告:失败任务数量异常';
}
return $alerts;
}
}
?>
6.2 运维建议
- Redis持久化:配置AOF和RDB确保任务不丢失
- 监控告警:监控队列长度、处理延迟、失败率
- 水平扩展:多个工作者实例消费同一队列
- 灰度发布:新版本处理器逐步替换旧版本
- 日志记录:详细记录任务生命周期便于排查
总结
本文详细介绍了基于Redis的PHP异步延迟任务系统的设计与实现。该系统具有以下特点:
- 高性能:利用Redis内存操作,支持高并发任务处理
- 可靠性:任务持久化、失败重试、状态跟踪
- 灵活性:支持延迟执行、优先级、分布式部署
- 易扩展:模块化设计,方便添加新的任务类型
通过实际案例展示了如何在电商订单系统中应用该方案,有效解决了订单超时关闭的业务需求。读者可以根据实际业务需求,扩展更多任务处理器,构建适合自己业务场景的异步任务系统。

