PHP异步任务队列系统实战:基于Redis与Swoole的高性能解决方案 | PHP高级编程教程

2026-01-15 0 841
免费资源下载
作者:后端架构师
发布日期:2023年11月
难度:高级

引言:为什么需要异步任务队列?

在现代Web应用中,许多耗时操作如邮件发送、图片处理、数据同步等会阻塞用户请求,影响用户体验。本文将构建一个基于Redis和Swoole的高性能异步任务队列系统,实现任务的解耦和异步处理。

技术栈:PHP 8.1+、Redis 6+、Swoole 4.8+、Composer

系统架构设计

架构图说明

生产者 → Redis队列 → 消费者Worker → 任务处理器 → 结果存储
    ↑           ↑           ↑
    └─ 监控系统 ─┴─ 日志系统 ─┘
                    

核心组件

  • 任务生产者(Producer):生成任务并推送到队列
  • Redis队列:存储待处理任务
  • 任务消费者(Consumer):从队列获取并处理任务
  • 任务监控器(Monitor):监控队列状态和任务执行情况

完整实现教程

步骤1:环境准备与依赖安装

# composer.json
{
    "require": {
        "php": ">=8.1",
        "ext-redis": "*",
        "ext-swoole": ">=4.8.0",
        "monolog/monolog": "^2.0"
    },
    "autoload": {
        "psr-4": {
            "AsyncQueue\": "src/"
        }
    }
}
# 安装依赖
composer install

# 创建项目结构
mkdir -p src/{Core,Job,Worker,Exception}
mkdir -p config logs storage

步骤2:核心队列类实现

// src/Core/Queue.php
<?php

namespace AsyncQueueCore;

use Redis;
use AsyncQueueExceptionQueueException;

class Queue
{
    private Redis $redis;
    private string $queueName;
    private array $config;
    
    public function __construct(array $config)
    {
        $this->config = $config;
        $this->queueName = $config['queue_name'] ?? 'async_queue';
        $this->initRedis();
    }
    
    private function initRedis(): void
    {
        $this->redis = new Redis();
        
        $connected = $this->redis->connect(
            $this->config['host'] ?? '127.0.0.1',
            $this->config['port'] ?? 6379,
            $this->config['timeout'] ?? 2.0
        );
        
        if (!$connected) {
            throw new QueueException('Redis连接失败');
        }
        
        if (!empty($this->config['password'])) {
            $this->redis->auth($this->config['password']);
        }
        
        if (!empty($this->config['database'])) {
            $this->redis->select($this->config['database']);
        }
    }
    
    /**
     * 推送任务到队列
     */
    public function push(string $jobClass, array $data, int $delay = 0): string
    {
        $jobId = uniqid('job_', true);
        $jobData = [
            'id' => $jobId,
            'class' => $jobClass,
            'data' => $data,
            'created_at' => time(),
            'attempts' => 0,
            'max_attempts' => $this->config['max_attempts'] ?? 3
        ];
        
        $serialized = serialize($jobData);
        
        if ($delay > 0) {
            // 延迟队列使用有序集合
            $score = time() + $delay;
            $this->redis->zAdd("{$this->queueName}:delayed", $score, $serialized);
        } else {
            // 立即执行队列
            $this->redis->lPush($this->queueName, $serialized);
        }
        
        return $jobId;
    }
    
    /**
     * 从队列获取任务
     */
    public function pop(): ?array
    {
        // 检查延迟队列
        $this->moveDelayedJobs();
        
        $serialized = $this->redis->rPop($this->queueName);
        
        if (!$serialized) {
            return null;
        }
        
        $jobData = unserialize($serialized);
        
        if (!is_array($jobData)) {
            throw new QueueException('任务数据反序列化失败');
        }
        
        return $jobData;
    }
    
    /**
     * 将到期的延迟任务移到执行队列
     */
    private function moveDelayedJobs(): void
    {
        $now = time();
        $items = $this->redis->zRangeByScore(
            "{$this->queueName}:delayed",
            0,
            $now
        );
        
        if (!empty($items)) {
            $this->redis->multi();
            foreach ($items as $item) {
                $this->redis->lPush($this->queueName, $item);
                $this->redis->zRem("{$this->queueName}:delayed", $item);
            }
            $this->redis->exec();
        }
    }
    
    /**
     * 获取队列长度
     */
    public function size(): int
    {
        return $this->redis->lLen($this->queueName);
    }
    
    /**
     * 获取延迟队列长度
     */
    public function delayedSize(): int
    {
        return $this->redis->zCard("{$this->queueName}:delayed");
    }
}

步骤3:任务基类与处理器

// src/Job/JobInterface.php
<?php

namespace AsyncQueueJob;

interface JobInterface
{
    /**
     * 执行任务
     */
    public function handle(array $data): bool;
    
    /**
     * 任务失败时的处理
     */
    public function failed(array $data, Throwable $exception): void;
}

// src/Job/AbstractJob.php
<?php

namespace AsyncQueueJob;

abstract class AbstractJob implements JobInterface
{
    protected int $timeout = 60;
    protected int $maxAttempts = 3;
    
    public function getTimeout(): int
    {
        return $this->timeout;
    }
    
    public function getMaxAttempts(): int
    {
        return $this->maxAttempts;
    }
}

// 示例任务:发送邮件
// src/Job/SendEmailJob.php
<?php

namespace AsyncQueueJob;

class SendEmailJob extends AbstractJob
{
    public function handle(array $data): bool
    {
        // 模拟邮件发送
        $to = $data['to'] ?? '';
        $subject = $data['subject'] ?? '';
        $body = $data['body'] ?? '';
        
        if (empty($to)) {
            throw new InvalidArgumentException('收件人邮箱不能为空');
        }
        
        // 实际发送邮件逻辑
        // mail($to, $subject, $body);
        
        sleep(2); // 模拟耗时操作
        
        echo "邮件发送成功: {$to}n";
        
        return true;
    }
    
    public function failed(array $data, Throwable $exception): void
    {
        // 记录失败日志或发送告警
        error_log("邮件发送失败: " . $exception->getMessage());
        
        // 可以在这里实现重试逻辑或通知管理员
        $this->notifyAdmin($data, $exception);
    }
    
    private function notifyAdmin(array $data, Throwable $exception): void
    {
        // 通知管理员逻辑
    }
}

步骤4:基于Swoole的Worker实现

// src/Worker/Worker.php
<?php

namespace AsyncQueueWorker;

use AsyncQueueCoreQueue;
use AsyncQueueJobJobInterface;
use SwooleProcess;
use SwooleTimer;

class Worker
{
    private Queue $queue;
    private array $config;
    private bool $shouldStop = false;
    private int $processedCount = 0;
    private int $startTime;
    
    public function __construct(Queue $queue, array $config = [])
    {
        $this->queue = $queue;
        $this->config = array_merge([
            'sleep' => 1,
            'max_memory' => 128 * 1024 * 1024, // 128MB
            'max_jobs' => 1000,
            'time_limit' => 3600, // 1小时
        ], $config);
        
        $this->startTime = time();
    }
    
    /**
     * 启动Worker进程
     */
    public function start(): void
    {
        $this->registerSignalHandlers();
        
        echo "Worker启动,PID: " . getmypid() . "n";
        echo "队列: " . $this->config['queue_name'] . "n";
        
        while (!$this->shouldStop) {
            $this->checkLimits();
            
            $jobData = $this->queue->pop();
            
            if ($jobData === null) {
                // 队列为空,等待
                sleep($this->config['sleep']);
                continue;
            }
            
            $this->processJob($jobData);
            $this->processedCount++;
            
            // 每处理10个任务检查一次内存
            if ($this->processedCount % 10 === 0) {
                $this->checkMemory();
            }
        }
        
        echo "Worker正常停止n";
    }
    
    /**
     * 处理单个任务
     */
    private function processJob(array $jobData): void
    {
        $jobId = $jobData['id'];
        $jobClass = $jobData['class'];
        $data = $jobData['data'];
        $attempts = $jobData['attempts'] + 1;
        
        echo "处理任务: {$jobId} (尝试: {$attempts})n";
        
        try {
            if (!class_exists($jobClass)) {
                throw new RuntimeException("任务类不存在: {$jobClass}");
            }
            
            $job = new $jobClass();
            
            if (!$job instanceof JobInterface) {
                throw new RuntimeException("任务类必须实现JobInterface");
            }
            
            // 设置执行超时
            $timeout = $job->getTimeout();
            $result = $this->executeWithTimeout(
                fn() => $job->handle($data),
                $timeout
            );
            
            if ($result) {
                echo "任务完成: {$jobId}n";
            }
            
        } catch (Throwable $e) {
            echo "任务失败: {$jobId} - " . $e->getMessage() . "n";
            
            // 重试逻辑
            if ($attempts getMessage();
                
                // 延迟重试:指数退避
                $delay = min(300, pow(2, $attempts) * 5); // 最大5分钟
                $this->queue->push($jobClass, $data, $delay);
                echo "任务已安排重试,延迟 {$delay}秒n";
            } else {
                // 最终失败
                if (isset($job) && $job instanceof JobInterface) {
                    $job->failed($data, $e);
                }
                echo "任务最终失败,已达最大重试次数n";
            }
        }
    }
    
    /**
     * 带超时的任务执行
     */
    private function executeWithTimeout(callable $callback, int $timeout): mixed
    {
        if (!extension_loaded('swoole')) {
            return $callback();
        }
        
        $channel = new SwooleCoroutineChannel(1);
        
        go(function () use ($channel, $callback) {
            try {
                $result = $callback();
                $channel->push(['success' => true, 'result' => $result]);
            } catch (Throwable $e) {
                $channel->push(['success' => false, 'exception' => $e]);
            }
        });
        
        $result = $channel->pop($timeout);
        
        if ($result === false) {
            throw new RuntimeException("任务执行超时 ({$timeout}秒)");
        }
        
        if (!$result['success']) {
            throw $result['exception'];
        }
        
        return $result['result'];
    }
    
    /**
     * 检查内存使用
     */
    private function checkMemory(): void
    {
        $memory = memory_get_usage(true);
        $maxMemory = $this->config['max_memory'];
        
        if ($memory > $maxMemory) {
            echo "内存使用过高: " . round($memory / 1024 / 1024, 2) . "MBn";
            $this->shouldStop = true;
        }
    }
    
    /**
     * 检查运行限制
     */
    private function checkLimits(): void
    {
        // 检查时间限制
        if (time() - $this->startTime > $this->config['time_limit']) {
            echo "达到时间限制,停止Workern";
            $this->shouldStop = true;
        }
        
        // 检查处理任务数量限制
        if ($this->processedCount >= $this->config['max_jobs']) {
            echo "达到最大任务处理数,停止Workern";
            $this->shouldStop = true;
        }
    }
    
    /**
     * 注册信号处理器
     */
    private function registerSignalHandlers(): void
    {
        if (extension_loaded('pcntl')) {
            pcntl_async_signals(true);
            pcntl_signal(SIGTERM, fn() => $this->shouldStop = true);
            pcntl_signal(SIGINT, fn() => $this->shouldStop = true);
        }
    }
}

步骤5:任务生产者与监控系统

// producer.php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use AsyncQueueCoreQueue;
use AsyncQueueJobSendEmailJob;

$config = [
    'host' => '127.0.0.1',
    'port' => 6379,
    'queue_name' => 'async_queue',
    'max_attempts' => 3
];

$queue = new Queue($config);

// 生产邮件发送任务
$jobId = $queue->push(SendEmailJob::class, [
    'to' => 'user@example.com',
    'subject' => '欢迎注册',
    'body' => '感谢您注册我们的服务!'
]);

echo "任务已创建: {$jobId}n";

// 生产延迟任务(5分钟后执行)
$delayedJobId = $queue->push(SendEmailJob::class, [
    'to' => 'user@example.com',
    'subject' => '提醒通知',
    'body' => '这是您的提醒消息'
], 300); // 300秒 = 5分钟

echo "延迟任务已创建: {$delayedJobId}n";

// monitor.php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use AsyncQueueCoreQueue;

class QueueMonitor
{
    private Queue $queue;
    
    public function __construct(Queue $queue)
    {
        $this->queue = $queue;
    }
    
    public function getStats(): array
    {
        return [
            'pending' => $this->queue->size(),
            'delayed' => $this->queue->delayedSize(),
            'memory_usage' => round(memory_get_usage(true) / 1024 / 1024, 2) . 'MB',
            'timestamp' => date('Y-m-d H:i:s')
        ];
    }
    
    public function runWebMonitor(int $port = 8080): void
    {
        if (!extension_loaded('swoole')) {
            echo "需要Swoole扩展来运行Web监控n";
            return;
        }
        
        $http = new SwooleHttpServer('0.0.0.0', $port);
        
        $http->on('request', function ($request, $response) {
            $stats = $this->getStats();
            
            $response->header('Content-Type', 'application/json');
            $response->end(json_encode([
                'status' => 'success',
                'data' => $stats
            ], JSON_PRETTY_PRINT));
        });
        
        echo "监控服务启动: http://0.0.0.0:{$port}n";
        $http->start();
    }
}

// 使用示例
$config = [
    'host' => '127.0.0.1',
    'port' => 6379,
    'queue_name' => 'async_queue'
];

$queue = new Queue($config);
$monitor = new QueueMonitor($queue);

// 命令行查看状态
print_r($monitor->getStats());

// 启动Web监控(可选)
// $monitor->runWebMonitor();

步骤6:部署与进程管理

# worker.sh - Worker启动脚本
#!/bin/bash

BASE_DIR=$(cd "$(dirname "$0")"; pwd)
PID_FILE="$BASE_DIR/storage/worker.pid"
LOG_FILE="$BASE_DIR/logs/worker.log"

start() {
    if [ -f "$PID_FILE" ]; then
        PID=$(cat "$PID_FILE")
        if kill -0 $PID 2>/dev/null; then
            echo "Worker已经在运行 (PID: $PID)"
            return 1
        fi
    fi
    
    nohup php "$BASE_DIR/worker.php" >> "$LOG_FILE" 2>&1 &
    PID=$!
    echo $PID > "$PID_FILE"
    echo "Worker启动成功 (PID: $PID)"
}

stop() {
    if [ ! -f "$PID_FILE" ]; then
        echo "Worker未运行"
        return 1
    fi
    
    PID=$(cat "$PID_FILE")
    if kill -0 $PID 2>/dev/null; then
        kill -TERM $PID
        rm "$PID_FILE"
        echo "Worker已停止"
    else
        echo "Worker进程不存在,清理PID文件"
        rm "$PID_FILE"
    fi
}

case "$1" in
    start)
        start
        ;;
    stop)
        stop
        ;;
    restart)
        stop
        sleep 2
        start
        ;;
    *)
        echo "用法: $0 {start|stop|restart}"
        exit 1
        ;;
esac

# worker.php - Worker入口文件
<?php

require_once __DIR__ . '/vendor/autoload.php';

use AsyncQueueCoreQueue;
use AsyncQueueWorkerWorker;

// 加载配置
$config = require __DIR__ . '/config/queue.php';

$queue = new Queue($config);
$worker = new Worker($queue, $config);

// 设置错误处理器
set_error_handler(function($errno, $errstr, $errfile, $errline) {
    error_log("Worker错误: [$errno] $errstr in $errfile:$errline");
    return true;
});

// 设置异常处理器
set_exception_handler(function($exception) {
    error_log("Worker异常: " . $exception->getMessage());
    exit(1);
});

// 启动Worker
$worker->start();

性能优化与高级特性

1. 连接池优化

// src/Core/ConnectionPool.php
<?php

namespace AsyncQueueCore;

use SwooleCoroutineChannel;

class ConnectionPool
{
    private Channel $pool;
    private int $size;
    private string $host;
    private int $port;
    
    public function __construct(string $host, int $port, int $size = 10)
    {
        $this->host = $host;
        $this->port = $port;
        $this->size = $size;
        $this->pool = new Channel($size);
        
        $this->initPool();
    }
    
    private function initPool(): void
    {
        for ($i = 0; $i size; $i++) {
            $redis = new Redis();
            $redis->connect($this->host, $this->port);
            $this->pool->push($redis);
        }
    }
    
    public function get(): Redis
    {
        return $this->pool->pop();
    }
    
    public function put(Redis $redis): void
    {
        $this->pool->push($redis);
    }
}

2. 批量任务处理

// src/Worker/BatchWorker.php
<?php

namespace AsyncQueueWorker;

class BatchWorker extends Worker
{
    private int $batchSize = 10;
    
    public function setBatchSize(int $size): void
    {
        $this->batchSize = max(1, $size);
    }
    
    protected function processBatch(): void
    {
        $jobs = [];
        
        for ($i = 0; $i batchSize; $i++) {
            $jobData = $this->queue->pop();
            if ($jobData === null) {
                break;
            }
            $jobs[] = $jobData;
        }
        
        if (empty($jobs)) {
            return;
        }
        
        // 并行处理批量任务
        $this->processJobsParallel($jobs);
    }
    
    private function processJobsParallel(array $jobs): void
    {
        $channels = [];
        
        foreach ($jobs as $index => $jobData) {
            $channel = new SwooleCoroutineChannel(1);
            $channels[$index] = $channel;
            
            go(function () use ($channel, $jobData) {
                try {
                    $result = $this->processSingleJob($jobData);
                    $channel->push(['success' => true, 'result' => $result]);
                } catch (Throwable $e) {
                    $channel->push(['success' => false, 'exception' => $e]);
                }
            });
        }
        
        // 收集结果
        foreach ($channels as $channel) {
            $result = $channel->pop();
            // 处理结果...
        }
    }
}

测试与验证

单元测试示例

// tests/QueueTest.php
<?php

use PHPUnitFrameworkTestCase;
use AsyncQueueCoreQueue;

class QueueTest extends TestCase
{
    private Queue $queue;
    
    protected function setUp(): void
    {
        $config = [
            'host' => getenv('REDIS_HOST') ?: '127.0.0.1',
            'port' => getenv('REDIS_PORT') ?: 6379,
            'queue_name' => 'test_queue_' . uniqid()
        ];
        
        $this->queue = new Queue($config);
    }
    
    public function testPushAndPop(): void
    {
        $jobId = $this->queue->push('TestJob', ['data' => 'test']);
        
        $this->assertNotEmpty($jobId);
        
        $jobData = $this->queue->pop();
        
        $this->assertIsArray($jobData);
        $this->assertEquals($jobId, $jobData['id']);
        $this->assertEquals('TestJob', $jobData['class']);
    }
    
    public function testDelayedJob(): void
    {
        $jobId = $this->queue->push('TestJob', ['data' => 'delayed'], 1);
        
        // 立即获取应该为空
        $jobData = $this->queue->pop();
        $this->assertNull($jobData);
        
        // 等待1秒后应该能获取到
        sleep(2);
        $jobData = $this->queue->pop();
        $this->assertNotNull($jobData);
        $this->assertEquals($jobId, $jobData['id']);
    }
    
    protected function tearDown(): void
    {
        // 清理测试数据
    }
}

生产环境部署建议

  1. 高可用配置:使用Redis Sentinel或Cluster
  2. 监控告警:集成Prometheus + Grafana
  3. 日志收集:使用ELK Stack或Loki
  4. 进程管理:使用Supervisor或systemd
  5. 安全加固:Redis密码认证、网络隔离

Supervisor配置示例

# /etc/supervisor/conf.d/async_queue.conf
[program:async_queue_worker]
command=php /var/www/async-queue/worker.php
directory=/var/www/async-queue
user=www-data
autostart=true
autorestart=true
startretries=3
numprocs=4
process_name=%(program_name)s_%(process_num)02d
stdout_logfile=/var/log/async_queue/worker.log
stderr_logfile=/var/log/async_queue/worker.error.log

总结与扩展

通过本教程,我们实现了一个完整的PHP异步任务队列系统,具备以下特性:

  • 基于Redis的高性能队列存储
  • Swoole协程支持的高并发处理
  • 延迟任务和重试机制
  • 完善的监控和管理功能
  • 生产级别的部署方案

扩展方向

  • 添加优先级队列支持
  • 实现任务依赖关系
  • 集成消息中间件(RabbitMQ/Kafka)
  • 添加Web管理界面
  • 支持分布式Worker调度

这个系统可以广泛应用于电商、社交、物联网等需要异步处理的场景,显著提升系统吞吐量和用户体验。

PHP异步任务队列系统实战:基于Redis与Swoole的高性能解决方案 | PHP高级编程教程
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 php PHP异步任务队列系统实战:基于Redis与Swoole的高性能解决方案 | PHP高级编程教程 https://www.taomawang.com/server/php/1534.html

常见问题

相关文章

猜你喜欢
发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务