PHP异步任务队列实战:基于Redis实现高性能后台任务处理系统 | 原创教程

2026-02-21 0 717
免费资源下载

原创技术教程 | 最后更新:2023年10月

一、异步任务队列的核心价值

在现代Web应用中,用户请求的即时响应至关重要。然而,某些操作如邮件发送、图片处理、数据报表生成等耗时任务会阻塞请求流程。异步任务队列通过将耗时操作从主请求流程中解耦,显著提升系统响应速度和吞吐量。

传统同步处理的问题:

  • 用户等待时间过长,体验差
  • 并发处理能力受限
  • 单点故障导致任务丢失
  • 资源利用率不均衡

异步队列的优势:

  • 请求响应时间从秒级降至毫秒级
  • 支持任务重试和失败处理
  • 实现削峰填谷,提高系统稳定性
  • 便于横向扩展

二、系统架构设计

我们设计一个基于Redis的轻量级任务队列系统,包含以下核心组件:

2.1 架构图

    +----------------+     +---------------+     +-------------------+
    |   Web应用      |---->|  Redis队列    |---->|    Worker进程     |
    |   (生产者)     |     |   (消息中间件) |     |    (消费者)       |
    +----------------+     +---------------+     +-------------------+
          |                       |                       |
          | 提交任务              | 存储任务              | 执行任务
          v                       v                       v
    +----------------+     +---------------+     +-------------------+
    |   任务状态查询  |<----|  任务状态存储  |<----|   任务结果回写   |
    +----------------+     +---------------+     +-------------------+
                

2.2 队列设计

  • 等待队列:存放待处理任务
  • 处理中队列:正在执行的任务
  • 完成队列:已成功完成的任务
  • 失败队列:执行失败的任务

三、完整代码实现

3.1 环境要求

PHP >= 7.4
Redis >= 5.0
Composer (用于安装predis/predis)
                

3.2 安装依赖

composer require predis/predis
                

3.3 核心队列类实现

<?php
/**
 * Redis异步任务队列核心类
 */
class AsyncTaskQueue
{
    private $redis;
    private $queuePrefix = 'async_queue:';
    
    public function __construct($config = [])
    {
        $defaultConfig = [
            'host' => '127.0.0.1',
            'port' => 6379,
            'database' => 0,
            'password' => null,
            'timeout' => 5.0
        ];
        
        $config = array_merge($defaultConfig, $config);
        
        $this->redis = new PredisClient([
            'scheme' => 'tcp',
            'host' => $config['host'],
            'port' => $config['port'],
            'database' => $config['database'],
            'password' => $config['password'],
            'timeout' => $config['timeout']
        ]);
    }
    
    /**
     * 提交任务到队列
     */
    public function push(string $queueName, array $task, int $delay = 0): string
    {
        $taskId = uniqid('task_', true);
        $taskData = [
            'id' => $taskId,
            'data' => $task,
            'created_at' => time(),
            'attempts' => 0,
            'status' => 'pending',
            'queue' => $queueName
        ];
        
        $taskKey = $this->queuePrefix . 'task:' . $taskId;
        
        // 存储任务详情
        $this->redis->hmset($taskKey, $taskData);
        
        if ($delay > 0) {
            // 延迟任务
            $score = time() + $delay;
            $this->redis->zadd($this->queuePrefix . 'delayed:' . $queueName, [$taskId => $score]);
        } else {
            // 立即执行任务
            $this->redis->lpush($this->queuePrefix . 'waiting:' . $queueName, $taskId);
        }
        
        return $taskId;
    }
    
    /**
     * 从队列获取任务
     */
    public function pop(string $queueName, int $timeout = 5): ?array
    {
        // 检查延迟队列
        $this->moveDelayedTasks($queueName);
        
        // 从等待队列获取任务
        $result = $this->redis->brpop(
            $this->queuePrefix . 'waiting:' . $queueName,
            $timeout
        );
        
        if (!$result) {
            return null;
        }
        
        $taskId = $result[1];
        
        // 获取任务详情
        $taskData = $this->redis->hgetall($this->queuePrefix . 'task:' . $taskId);
        
        if (empty($taskData)) {
            return null;
        }
        
        // 更新任务状态
        $this->redis->hmset($this->queuePrefix . 'task:' . $taskId, [
            'status' => 'processing',
            'started_at' => time(),
            'attempts' => intval($taskData['attempts'] ?? 0) + 1
        ]);
        
        // 添加到处理中队列
        $this->redis->lpush($this->queuePrefix . 'processing:' . $queueName, $taskId);
        
        return [
            'id' => $taskId,
            'data' => json_decode($taskData['data'] ?? '[]', true),
            'queue' => $queueName
        ];
    }
    
    /**
     * 完成任务
     */
    public function complete(string $taskId, $result = null): bool
    {
        $taskKey = $this->queuePrefix . 'task:' . $taskId;
        $taskData = $this->redis->hgetall($taskKey);
        
        if (empty($taskData)) {
            return false;
        }
        
        // 更新任务状态
        $updateData = [
            'status' => 'completed',
            'completed_at' => time(),
            'result' => json_encode($result)
        ];
        
        $this->redis->hmset($taskKey, $updateData);
        
        // 从处理中队列移除
        $queueName = $taskData['queue'];
        $this->redis->lrem($this->queuePrefix . 'processing:' . $queueName, 0, $taskId);
        
        // 添加到完成队列
        $this->redis->lpush($this->queuePrefix . 'completed:' . $queueName, $taskId);
        
        // 设置过期时间(7天后自动删除)
        $this->redis->expire($taskKey, 604800);
        
        return true;
    }
    
    /**
     * 任务失败处理
     */
    public function fail(string $taskId, string $error, int $maxRetries = 3): bool
    {
        $taskKey = $this->queuePrefix . 'task:' . $taskId;
        $taskData = $this->redis->hgetall($taskKey);
        
        if (empty($taskData)) {
            return false;
        }
        
        $attempts = intval($taskData['attempts'] ?? 0);
        $queueName = $taskData['queue'];
        
        if ($attempts >= $maxRetries) {
            // 超过重试次数,标记为失败
            $updateData = [
                'status' => 'failed',
                'failed_at' => time(),
                'error' => $error,
                'final_attempt' => $attempts
            ];
            
            $this->redis->hmset($taskKey, $updateData);
            
            // 添加到失败队列
            $this->redis->lpush($this->queuePrefix . 'failed:' . $queueName, $taskId);
        } else {
            // 重新加入队列重试
            $delay = min(300, pow(2, $attempts) * 60); // 指数退避
            $this->redis->zadd(
                $this->queuePrefix . 'delayed:' . $queueName,
                [$taskId => time() + $delay]
            );
            
            $updateData = [
                'status' => 'retrying',
                'last_error' => $error,
                'next_retry_at' => time() + $delay
            ];
            
            $this->redis->hmset($taskKey, $updateData);
        }
        
        // 从处理中队列移除
        $this->redis->lrem($this->queuePrefix . 'processing:' . $queueName, 0, $taskId);
        
        return true;
    }
    
    /**
     * 移动延迟任务到等待队列
     */
    private function moveDelayedTasks(string $queueName): void
    {
        $delayedKey = $this->queuePrefix . 'delayed:' . $queueName;
        $now = time();
        
        // 获取所有到期的任务
        $tasks = $this->redis->zrangebyscore($delayedKey, 0, $now);
        
        if (!empty($tasks)) {
            foreach ($tasks as $taskId) {
                // 添加到等待队列
                $this->redis->lpush($this->queuePrefix . 'waiting:' . $queueName, $taskId);
                // 从延迟队列移除
                $this->redis->zrem($delayedKey, $taskId);
            }
        }
    }
    
    /**
     * 获取队列统计信息
     */
    public function getStats(string $queueName): array
    {
        return [
            'waiting' => $this->redis->llen($this->queuePrefix . 'waiting:' . $queueName),
            'processing' => $this->redis->llen($this->queuePrefix . 'processing:' . $queueName),
            'completed' => $this->redis->llen($this->queuePrefix . 'completed:' . $queueName),
            'failed' => $this->redis->llen($this->queuePrefix . 'failed:' . $queueName),
            'delayed' => $this->redis->zcard($this->queuePrefix . 'delayed:' . $queueName)
        ];
    }
}
                

3.4 Worker进程实现

<?php
/**
 * 任务Worker类
 */
class TaskWorker
{
    private $queue;
    private $handlers = [];
    private $running = false;
    
    public function __construct(AsyncTaskQueue $queue)
    {
        $this->queue = $queue;
    }
    
    /**
     * 注册任务处理器
     */
    public function registerHandler(string $taskType, callable $handler): void
    {
        $this->handlers[$taskType] = $handler;
    }
    
    /**
     * 启动Worker
     */
    public function start(string $queueName, array $options = []): void
    {
        $defaultOptions = [
            'sleep' => 1, // 队列空时休眠时间
            'max_memory' => 128 * 1024 * 1024, // 最大内存限制
            'max_tasks' => 1000, // 最大处理任务数
            'stop_file' => '/tmp/stop_worker' // 停止信号文件
        ];
        
        $options = array_merge($defaultOptions, $options);
        
        $this->running = true;
        $processed = 0;
        
        echo "Worker started on queue: {$queueName}n";
        
        // 注册信号处理器
        pcntl_signal(SIGTERM, [$this, 'stop']);
        pcntl_signal(SIGINT, [$this, 'stop']);
        
        while ($this->running) {
            // 检查停止信号
            if (file_exists($options['stop_file'])) {
                echo "Stop signal detected, shutting down...n";
                $this->running = false;
                break;
            }
            
            // 检查内存限制
            if (memory_get_usage(true) > $options['max_memory']) {
                echo "Memory limit reached, restarting...n";
                $this->running = false;
                break;
            }
            
            // 处理任务
            $task = $this->queue->pop($queueName);
            
            if ($task === null) {
                // 队列为空,休眠
                sleep($options['sleep']);
                continue;
            }
            
            try {
                echo "Processing task: {$task['id']}n";
                
                // 执行任务
                $result = $this->executeTask($task);
                
                // 标记任务完成
                $this->queue->complete($task['id'], $result);
                
                echo "Task completed: {$task['id']}n";
                
            } catch (Exception $e) {
                echo "Task failed: {$task['id']} - {$e->getMessage()}n";
                
                // 标记任务失败
                $this->queue->fail($task['id'], $e->getMessage());
            }
            
            $processed++;
            
            // 检查最大任务数限制
            if ($processed >= $options['max_tasks']) {
                echo "Reached max tasks limit, restarting...n";
                $this->running = false;
                break;
            }
            
            // 处理信号
            pcntl_signal_dispatch();
        }
        
        echo "Worker stopped. Processed {$processed} tasks.n";
    }
    
    /**
     * 执行具体任务
     */
    private function executeTask(array $task)
    {
        $taskData = $task['data'];
        $taskType = $taskData['type'] ?? 'default';
        
        if (!isset($this->handlers[$taskType])) {
            throw new Exception("No handler registered for task type: {$taskType}");
        }
        
        return call_user_func($this->handlers[$taskType], $taskData);
    }
    
    /**
     * 停止Worker
     */
    public function stop(): void
    {
        $this->running = false;
    }
}
                

3.5 实际应用示例

<?php
// 示例:用户注册后的异步任务处理
require_once 'AsyncTaskQueue.php';
require_once 'TaskWorker.php';

// 初始化队列
$queue = new AsyncTaskQueue([
    'host' => 'localhost',
    'port' => 6379
]);

// 用户注册处理函数
function handleUserRegistration($data)
{
    $userId = $data['user_id'];
    $email = $data['email'];
    
    // 1. 发送欢迎邮件
    sendWelcomeEmail($email, $userId);
    
    // 2. 创建用户统计记录
    createUserStatistics($userId);
    
    // 3. 推荐系统初始化
    initRecommendationSystem($userId);
    
    // 4. 发送管理员通知
    notifyAdministrator($userId);
    
    return [
        'user_id' => $userId,
        'processed_at' => date('Y-m-d H:i:s'),
        'steps_completed' => 4
    ];
}

// 邮件发送任务
function sendWelcomeEmail($email, $userId)
{
    // 模拟邮件发送
    sleep(2);
    echo "Welcome email sent to: {$email}n";
    
    // 实际应用中这里调用邮件发送服务
    // mail($email, 'Welcome', 'Welcome to our service!');
}

// 其他处理函数...
function createUserStatistics($userId) { /* ... */ }
function initRecommendationSystem($userId) { /* ... */ }
function notifyAdministrator($userId) { /* ... */ }

// Web应用中的使用(生产者)
if ($_SERVER['REQUEST_METHOD'] === 'POST' && isset($_POST['register'])) {
    // 用户注册逻辑
    $userId = createUser($_POST);
    
    // 提交异步任务
    $taskId = $queue->push('user_tasks', [
        'type' => 'user_registration',
        'user_id' => $userId,
        'email' => $_POST['email'],
        'registration_time' => time()
    ]);
    
    // 立即返回响应
    echo json_encode([
        'success' => true,
        'message' => 'Registration successful',
        'task_id' => $taskId,
        'user_id' => $userId
    ]);
    
    exit;
}

// Worker启动脚本(消费者)
if (php_sapi_name() === 'cli' && isset($argv[1]) && $argv[1] === 'worker') {
    $worker = new TaskWorker($queue);
    
    // 注册任务处理器
    $worker->registerHandler('user_registration', 'handleUserRegistration');
    
    // 启动Worker
    $worker->start('user_tasks', [
        'sleep' => 2,
        'max_memory' => 256 * 1024 * 1024
    ]);
}
                

四、高级特性与优化

4.1 优先级队列实现

// 在AsyncTaskQueue类中添加
public function pushWithPriority(string $queueName, array $task, int $priority = 5): string
{
    $taskId = uniqid('task_', true);
    $taskData = [
        'id' => $taskId,
        'data' => $task,
        'priority' => $priority,
        'created_at' => time()
    ];
    
    $taskKey = $this->queuePrefix . 'task:' . $taskId;
    $this->redis->hmset($taskKey, $taskData);
    
    // 使用有序集合实现优先级
    $this->redis->zadd(
        $this->queuePrefix . 'priority:' . $queueName,
        [$taskId => $priority]
    );
    
    return $taskId;
}

public function popPriority(string $queueName): ?array
{
    // 获取最高优先级的任务
    $tasks = $this->redis->zrevrange(
        $this->queuePrefix . 'priority:' . $queueName,
        0,
        0,
        ['WITHSCORES' => true]
    );
    
    if (empty($tasks)) {
        return null;
    }
    
    $taskId = array_key_first($tasks);
    $this->redis->zrem($this->queuePrefix . 'priority:' . $queueName, $taskId);
    
    // 获取任务详情
    return $this->getTask($taskId);
}
                

4.2 批量任务处理

public function processBatch(string $queueName, int $batchSize = 10): array
{
    $tasks = [];
    
    for ($i = 0; $i pop($queueName, 1);
        if ($task) {
            $tasks[] = $task;
        } else {
            break;
        }
    }
    
    return $tasks;
}

// Worker中的批量处理
public function processBatchTasks(array $tasks): void
{
    // 并行处理任务
    $promises = [];
    
    foreach ($tasks as $task) {
        $promises[] = $this->executeTaskAsync($task);
    }
    
    // 等待所有任务完成
    $results = wait($promises);
    
    foreach ($results as $index => $result) {
        if ($result['success']) {
            $this->queue->complete($tasks[$index]['id'], $result['data']);
        } else {
            $this->queue->fail($tasks[$index]['id'], $result['error']);
        }
    }
}
                

4.3 任务依赖关系

public function pushWithDependencies(string $queueName, array $task, array $dependencies): string
{
    $taskId = uniqid('task_', true);
    
    // 存储依赖关系
    $dependencyKey = $this->queuePrefix . 'deps:' . $taskId;
    foreach ($dependencies as $depTaskId) {
        $this->redis->sadd($dependencyKey, $depTaskId);
    }
    
    // 设置任务状态为等待依赖
    $taskData = [
        'id' => $taskId,
        'data' => $task,
        'status' => 'waiting_deps',
        'dependencies' => json_encode($dependencies)
    ];
    
    $this->redis->hmset($this->queuePrefix . 'task:' . $taskId, $taskData);
    
    return $taskId;
}

public function checkDependencies(string $taskId): bool
{
    $dependencyKey = $this->queuePrefix . 'deps:' . $taskId;
    $dependencies = $this->redis->smembers($dependencyKey);
    
    foreach ($dependencies as $depTaskId) {
        $depStatus = $this->redis->hget($this->queuePrefix . 'task:' . $depTaskId, 'status');
        if ($depStatus !== 'completed') {
            return false;
        }
    }
    
    // 所有依赖都完成,可以执行任务
    $this->redis->del($dependencyKey);
    return true;
}
                

五、监控与管理

5.1 监控面板实现

<?php
// queue_monitor.php
class QueueMonitor
{
    private $queue;
    
    public function __construct(AsyncTaskQueue $queue)
    {
        $this->queue = $queue;
    }
    
    public function getDashboardData(): array
    {
        $queues = ['default', 'high_priority', 'emails', 'reports'];
        $data = [];
        
        foreach ($queues as $queue) {
            $data[$queue] = [
                'stats' => $this->queue->getStats($queue),
                'throughput' => $this->getThroughput($queue),
                'avg_processing_time' => $this->getAvgProcessingTime($queue)
            ];
        }
        
        return $data;
    }
    
    public function getFailedTasks(string $queueName, int $limit = 50): array
    {
        $failedKey = $this->queue->getQueuePrefix() . 'failed:' . $queueName;
        $taskIds = $this->queue->getRedis()->lrange($failedKey, 0, $limit - 1);
        
        $tasks = [];
        foreach ($taskIds as $taskId) {
            $taskData = $this->queue->getRedis()->hgetall(
                $this->queue->getQueuePrefix() . 'task:' . $taskId
            );
            if ($taskData) {
                $tasks[] = $taskData;
            }
        }
        
        return $tasks;
    }
    
    public function retryFailedTask(string $taskId): bool
    {
        $taskKey = $this->queue->getQueuePrefix() . 'task:' . $taskId;
        $taskData = $this->queue->getRedis()->hgetall($taskKey);
        
        if (!$taskData || ($taskData['status'] ?? '') !== 'failed') {
            return false;
        }
        
        // 重新加入队列
        $queueName = $taskData['queue'];
        $this->queue->getRedis()->lpush(
            $this->queue->getQueuePrefix() . 'waiting:' . $queueName,
            $taskId
        );
        
        // 更新状态
        $this->queue->getRedis()->hmset($taskKey, [
            'status' => 'pending',
            'retried_at' => time(),
            'error' => ''
        ]);
        
        // 从失败队列移除
        $this->queue->getRedis()->lrem(
            $this->queue->getQueuePrefix() . 'failed:' . $queueName,
            0,
            $taskId
        );
        
        return true;
    }
}
                

5.2 性能优化建议

  • 连接池管理:使用连接池减少Redis连接开销
  • 管道化操作:批量Redis命令减少网络往返
  • 内存优化:定期清理已完成的任务数据
  • 持久化策略:重要任务数据定期备份
  • 监控告警:设置队列长度阈值告警

5.3 生产环境部署

# Supervisor配置示例
[program:async_worker]
command=php /path/to/worker.php worker
process_name=%(program_name)s_%(process_num)02d
numprocs=4
directory=/path/to
autostart=true
autorestart=true
user=www-data
redirect_stderr=true
stdout_logfile=/var/log/worker.log
stdout_logfile_maxbytes=50MB
stdout_logfile_backups=10
                

总结

本文详细介绍了基于Redis的PHP异步任务队列系统的完整实现。通过这个系统,您可以:

  1. 将耗时操作异步化,提升Web应用响应速度
  2. 实现任务的可靠执行,支持重试和失败处理
  3. 构建可扩展的任务处理架构
  4. 通过监控面板实时掌握系统状态

这个解决方案特别适合处理邮件发送、数据报表生成、图片处理、批量数据导入等后台任务场景。您可以根据实际需求进一步扩展功能,如添加任务优先级、任务依赖、分布式Worker等高级特性。

PHP异步任务队列实战:基于Redis实现高性能后台任务处理系统 | 原创教程
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 php PHP异步任务队列实战:基于Redis实现高性能后台任务处理系统 | 原创教程 https://www.taomawang.com/server/php/1613.html

下一篇:

已经没有下一篇了!

常见问题

相关文章

猜你喜欢
发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务