PHP异步任务队列实战:基于Redis的延迟任务系统设计与实现 | PHP高级编程教程

2026-01-09 0 990
免费资源下载

作者:PHP架构师 | 发布日期:2023年10月

一、异步任务系统概述

在现代Web应用中,异步任务处理已成为提升系统性能的关键技术。传统的同步处理方式在面对耗时操作时(如邮件发送、图片处理、数据报表生成等)会导致请求阻塞,严重影响用户体验。本文将介绍一种基于Redis的PHP异步延迟任务系统,能够实现任务的延迟执行、失败重试和优先级管理。

1.1 为什么选择Redis作为任务队列

  • 高性能:Redis内存操作,读写速度极快
  • 丰富的数据结构:有序集合(zset)天然支持延迟队列
  • 持久化支持:RDB/AOF保证任务不丢失
  • 原子操作:Lua脚本保证操作一致性

二、系统架构设计


                +----------------+     +----------------+     +----------------+
                |   任务生产者    |---->|   Redis队列    |---->|   任务消费者    |
                |   (Producer)   |     |   (Redis)     |     |   (Consumer)   |
                +----------------+     +----------------+     +----------------+
                        |                       |                       |
                +----------------+     +----------------+     +----------------+
                |   Web应用层    |     |  延迟有序集合  |     |   Worker进程   |
                |               |     |   (ZSET)      |     |               |
                +----------------+     +----------------+     +----------------+
                

2.1 核心组件说明

任务生产者:负责创建任务,将任务序列化后存入Redis

Redis存储结构

  • 延迟队列:使用ZSET存储,score为执行时间戳
  • 就绪队列:使用LIST存储,等待执行的任务
  • 失败队列:使用LIST存储,执行失败的任务

任务消费者:守护进程,从队列中获取并执行任务

三、核心代码实现

3.1 任务类定义


<?php
/**
 * 异步任务基类
 */
abstract class AsyncTask
{
    protected $id;
    protected $data;
    protected $executeAt;
    protected $maxRetries = 3;
    protected $retryCount = 0;
    protected $priority = 1; // 1-10,数值越大优先级越高
    
    public function __construct(array $data, int $delaySeconds = 0)
    {
        $this->id = uniqid('task_', true);
        $this->data = $data;
        $this->executeAt = time() + $delaySeconds;
    }
    
    abstract public function execute(): bool;
    
    public function toArray(): array
    {
        return [
            'id' => $this->id,
            'class' => get_class($this),
            'data' => $this->data,
            'execute_at' => $this->executeAt,
            'priority' => $this->priority,
            'max_retries' => $this->maxRetries,
            'retry_count' => $this->retryCount
        ];
    }
    
    public static function fromArray(array $data): self
    {
        $task = new static($data['data']);
        $task->id = $data['id'];
        $task->executeAt = $data['execute_at'];
        $task->priority = $data['priority'];
        $task->maxRetries = $data['max_retries'];
        $task->retryCount = $data['retry_count'];
        return $task;
    }
}

/**
 * 邮件发送任务示例
 */
class EmailTask extends AsyncTask
{
    public function execute(): bool
    {
        $data = $this->data;
        
        // 模拟邮件发送逻辑
        $success = $this->sendEmail(
            $data['to'],
            $data['subject'],
            $data['body']
        );
        
        if (!$success && $this->retryCount maxRetries) {
            $this->retryCount++;
            return false; // 触发重试
        }
        
        return $success;
    }
    
    private function sendEmail($to, $subject, $body): bool
    {
        // 实际邮件发送逻辑
        // 这里简化为随机成功/失败
        return rand(0, 1) === 1;
    }
}
?>

3.2 队列管理器实现


<?php
/**
 * Redis队列管理器
 */
class RedisQueueManager
{
    private $redis;
    private $queueName;
    
    // 队列键名定义
    const KEY_DELAYED_QUEUE = 'async_tasks:delayed';
    const KEY_READY_QUEUE = 'async_tasks:ready';
    const KEY_FAILED_QUEUE = 'async_tasks:failed';
    const KEY_PROCESSING_QUEUE = 'async_tasks:processing';
    
    public function __construct(Redis $redis, string $queueName = 'default')
    {
        $this->redis = $redis;
        $this->queueName = $queueName;
    }
    
    /**
     * 发布延迟任务
     */
    public function publishDelayedTask(AsyncTask $task): bool
    {
        $taskData = json_encode($task->toArray());
        $score = $task->executeAt;
        
        return $this->redis->zAdd(
            self::KEY_DELAYED_QUEUE . ':' . $this->queueName,
            $score,
            $taskData
        ) > 0;
    }
    
    /**
     * 将到期的延迟任务转移到就绪队列
     */
    public function moveExpiredTasks(): int
    {
        $now = time();
        $delayedKey = self::KEY_DELAYED_QUEUE . ':' . $this->queueName;
        $readyKey = self::KEY_READY_QUEUE . ':' . $this->queueName;
        
        // 使用Lua脚本保证原子性
        $lua = <<<LUA
        local delayed_key = KEYS[1]
        local ready_key = KEYS[2]
        local max_time = ARGV[1]
        local limit = ARGV[2]
        
        local tasks = redis.call('zRangeByScore', delayed_key, 0, max_time, 'LIMIT', 0, limit)
        
        if #tasks > 0 then
            redis.call('zRemRangeByScore', delayed_key, 0, max_time)
            for i, task in ipairs(tasks) do
                redis.call('rPush', ready_key, task)
            end
        end
        
        return #tasks
        LUA;
        
        $result = $this->redis->eval(
            $lua,
            [$delayedKey, $readyKey, $now, 100],
            2
        );
        
        return $result ?: 0;
    }
    
    /**
     * 消费任务
     */
    public function consume(int $timeout = 5): ?AsyncTask
    {
        $readyKey = self::KEY_READY_QUEUE . ':' . $this->queueName;
        $processingKey = self::KEY_PROCESSING_QUEUE . ':' . $this->queueName;
        
        // BRPOPLPUSH 原子操作,防止任务丢失
        $taskData = $this->redis->brpoplpush(
            $readyKey,
            $processingKey,
            $timeout
        );
        
        if (!$taskData) {
            return null;
        }
        
        $taskArray = json_decode($taskData, true);
        
        if (json_last_error() !== JSON_ERROR_NONE) {
            $this->moveToFailedQueue($taskData, 'JSON解析失败');
            return null;
        }
        
        if (!class_exists($taskArray['class'])) {
            $this->moveToFailedQueue($taskData, '任务类不存在');
            return null;
        }
        
        return call_user_func([$taskArray['class'], 'fromArray'], $taskArray);
    }
    
    /**
     * 确认任务完成
     */
    public function ack(string $taskId): bool
    {
        $processingKey = self::KEY_PROCESSING_QUEUE . ':' . $this->queueName;
        
        // 从处理队列中移除
        return $this->redis->lRem($processingKey, json_encode(['id' => $taskId]), 1) > 0;
    }
    
    /**
     * 任务失败处理
     */
    public function handleFailure(AsyncTask $task, string $error): void
    {
        $taskData = json_encode($task->toArray());
        $failedKey = self::KEY_FAILED_QUEUE . ':' . $this->queueName;
        
        $failedItem = [
            'task' => $taskData,
            'error' => $error,
            'failed_at' => time(),
            'retry_count' => $task->retryCount
        ];
        
        $this->redis->rPush($failedKey, json_encode($failedItem));
        
        // 如果还有重试机会,重新加入延迟队列
        if ($task->retryCount maxRetries) {
            $task->executeAt = time() + pow(2, $task->retryCount) * 60; // 指数退避
            $this->publishDelayedTask($task);
        }
    }
    
    private function moveToFailedQueue(string $taskData, string $error): void
    {
        $failedKey = self::KEY_FAILED_QUEUE . ':' . $this->queueName;
        
        $failedItem = [
            'task' => $taskData,
            'error' => $error,
            'failed_at' => time()
        ];
        
        $this->redis->rPush($failedKey, json_encode($failedItem));
    }
}
?>

3.3 Worker守护进程


<?php
/**
 * Worker守护进程
 */
class TaskWorker
{
    private $queueManager;
    private $isRunning = false;
    private $maxJobs = 1000;
    private $processedJobs = 0;
    
    public function __construct(RedisQueueManager $queueManager)
    {
        $this->queueManager = $queueManager;
        pcntl_async_signals(true);
        
        // 注册信号处理器
        pcntl_signal(SIGTERM, [$this, 'handleSignal']);
        pcntl_signal(SIGINT, [$this, 'handleSignal']);
    }
    
    public function start(): void
    {
        $this->isRunning = true;
        $this->log('Worker启动');
        
        while ($this->isRunning && $this->processedJobs maxJobs) {
            try {
                // 1. 转移到期任务
                $moved = $this->queueManager->moveExpiredTasks();
                if ($moved > 0) {
                    $this->log("转移了 {$moved} 个到期任务到就绪队列");
                }
                
                // 2. 消费任务
                $task = $this->queueManager->consume(1);
                
                if ($task) {
                    $this->processTask($task);
                    $this->processedJobs++;
                }
                
                // 3. 防止CPU占用过高
                usleep(100000); // 100ms
                
            } catch (Exception $e) {
                $this->log("处理异常: " . $e->getMessage());
                sleep(5); // 异常后等待5秒
            }
        }
        
        $this->log('Worker停止');
    }
    
    private function processTask(AsyncTask $task): void
    {
        $this->log("开始处理任务: {$task->id}");
        
        try {
            $startTime = microtime(true);
            $success = $task->execute();
            $elapsed = round((microtime(true) - $startTime) * 1000, 2);
            
            if ($success) {
                $this->queueManager->ack($task->id);
                $this->log("任务 {$task->id} 执行成功,耗时 {$elapsed}ms");
            } else {
                $this->queueManager->handleFailure($task, '任务执行失败');
                $this->log("任务 {$task->id} 执行失败,已加入重试队列");
            }
            
        } catch (Throwable $e) {
            $this->queueManager->handleFailure($task, $e->getMessage());
            $this->log("任务 {$task->id} 执行异常: " . $e->getMessage());
        }
    }
    
    public function handleSignal(int $signal): void
    {
        $this->log("收到信号 {$signal},准备停止...");
        $this->isRunning = false;
    }
    
    private function log(string $message): void
    {
        $timestamp = date('Y-m-d H:i:s');
        echo "[{$timestamp}] {$message}n";
    }
}

// 使用示例
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

$queueManager = new RedisQueueManager($redis, 'email_queue');
$worker = new TaskWorker($queueManager);

// 生产任务示例
$emailTask = new EmailTask([
    'to' => 'user@example.com',
    'subject' => '欢迎邮件',
    'body' => '感谢注册我们的服务!'
], 60); // 60秒后执行

$queueManager->publishDelayedTask($emailTask);

// 启动worker(在实际环境中应作为守护进程运行)
// $worker->start();
?>

四、高级特性扩展

4.1 优先级队列实现


<?php
/**
 * 支持优先级的队列管理器
 */
class PriorityQueueManager extends RedisQueueManager
{
    public function publishDelayedTask(AsyncTask $task): bool
    {
        $taskData = json_encode($task->toArray());
        $score = $task->executeAt;
        
        // 将优先级信息编码到score中
        // 格式:执行时间戳.优先级(补零到2位)
        $priorityScore = sprintf('%d.%02d', $score, 10 - $task->priority);
        
        return $this->redis->zAdd(
            self::KEY_DELAYED_QUEUE . ':' . $this->queueName,
            $priorityScore,
            $taskData
        ) > 0;
    }
}
?>

4.2 任务结果回调机制


<?php
/**
 * 支持回调的任务类
 */
trait TaskCallbackTrait
{
    private $successCallback;
    private $failureCallback;
    
    public function onSuccess(callable $callback): self
    {
        $this->successCallback = $callback;
        return $this;
    }
    
    public function onFailure(callable $callback): self
    {
        $this->failureCallback = $callback;
        return $this;
    }
    
    protected function triggerSuccess($result): void
    {
        if ($this->successCallback) {
            call_user_func($this->successCallback, $result, $this);
        }
    }
    
    protected function triggerFailure(string $error): void
    {
        if ($this->failureCallback) {
            call_user_func($this->failureCallback, $error, $this);
        }
    }
}
?>

五、监控与维护

5.1 队列状态监控


<?php
/**
 * 队列监控器
 */
class QueueMonitor
{
    private $redis;
    
    public function getQueueStats(string $queueName): array
    {
        $prefixes = [
            'delayed' => RedisQueueManager::KEY_DELAYED_QUEUE,
            'ready' => RedisQueueManager::KEY_READY_QUEUE,
            'failed' => RedisQueueManager::KEY_FAILED_QUEUE,
            'processing' => RedisQueueManager::KEY_PROCESSING_QUEUE
        ];
        
        $stats = [];
        foreach ($prefixes as $key => $prefix) {
            $fullKey = $prefix . ':' . $queueName;
            
            if ($key === 'delayed') {
                $stats[$key] = $this->redis->zCard($fullKey);
            } else {
                $stats[$key] = $this->redis->lLen($fullKey);
            }
        }
        
        return [
            'queue_name' => $queueName,
            'stats' => $stats,
            'timestamp' => time(),
            'memory_usage' => $this->redis->info('memory')['used_memory'] ?? 0
        ];
    }
    
    public function getFailedTasks(string $queueName, int $limit = 100): array
    {
        $failedKey = RedisQueueManager::KEY_FAILED_QUEUE . ':' . $queueName;
        $tasks = $this->redis->lRange($failedKey, 0, $limit - 1);
        
        return array_map(function($task) {
            $data = json_decode($task, true);
            return [
                'failed_at' => date('Y-m-d H:i:s', $data['failed_at']),
                'error' => $data['error'],
                'retry_count' => $data['retry_count'] ?? 0,
                'task_data' => json_decode($data['task'], true)
            ];
        }, $tasks);
    }
}
?>

5.2 Supervisor配置


; /etc/supervisor/conf.d/async-worker.conf
[program:async-worker]
command=php /path/to/worker.php
process_name=%(program_name)s_%(process_num)02d
numprocs=4
directory=/path/to
autostart=true
autorestart=true
startretries=3
user=www-data
redirect_stderr=true
stdout_logfile=/var/log/async-worker.log
stdout_logfile_maxbytes=50MB
stdout_logfile_backups=10

5.3 性能优化建议

  1. 连接池:使用Redis连接池减少连接开销
  2. 管道技术:批量操作使用pipeline减少网络往返
  3. Lua脚本:复杂操作用Lua保证原子性
  4. 内存优化:定期清理已完成任务数据
  5. 监控告警:设置队列积压告警阈值

PHP异步任务队列实战:基于Redis的延迟任务系统设计与实现 | PHP高级编程教程
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 php PHP异步任务队列实战:基于Redis的延迟任务系统设计与实现 | PHP高级编程教程 https://www.taomawang.com/server/php/1513.html

下一篇:

已经没有下一篇了!

常见问题

相关文章

猜你喜欢
发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务