免费资源下载
作者:后端架构师
发布日期:2023年11月
难度:高级
发布日期:2023年11月
难度:高级
引言:为什么需要异步任务队列?
在现代Web应用中,许多耗时操作如邮件发送、图片处理、数据同步等会阻塞用户请求,影响用户体验。本文将构建一个基于Redis和Swoole的高性能异步任务队列系统,实现任务的解耦和异步处理。
技术栈:PHP 8.1+、Redis 6+、Swoole 4.8+、Composer
系统架构设计
架构图说明
生产者 → Redis队列 → 消费者Worker → 任务处理器 → 结果存储
↑ ↑ ↑
└─ 监控系统 ─┴─ 日志系统 ─┘
核心组件
- 任务生产者(Producer):生成任务并推送到队列
- Redis队列:存储待处理任务
- 任务消费者(Consumer):从队列获取并处理任务
- 任务监控器(Monitor):监控队列状态和任务执行情况
完整实现教程
步骤1:环境准备与依赖安装
# composer.json
{
"require": {
"php": ">=8.1",
"ext-redis": "*",
"ext-swoole": ">=4.8.0",
"monolog/monolog": "^2.0"
},
"autoload": {
"psr-4": {
"AsyncQueue\": "src/"
}
}
}
# 安装依赖
composer install
# 创建项目结构
mkdir -p src/{Core,Job,Worker,Exception}
mkdir -p config logs storage
步骤2:核心队列类实现
// src/Core/Queue.php
<?php
namespace AsyncQueueCore;
use Redis;
use AsyncQueueExceptionQueueException;
class Queue
{
private Redis $redis;
private string $queueName;
private array $config;
public function __construct(array $config)
{
$this->config = $config;
$this->queueName = $config['queue_name'] ?? 'async_queue';
$this->initRedis();
}
private function initRedis(): void
{
$this->redis = new Redis();
$connected = $this->redis->connect(
$this->config['host'] ?? '127.0.0.1',
$this->config['port'] ?? 6379,
$this->config['timeout'] ?? 2.0
);
if (!$connected) {
throw new QueueException('Redis连接失败');
}
if (!empty($this->config['password'])) {
$this->redis->auth($this->config['password']);
}
if (!empty($this->config['database'])) {
$this->redis->select($this->config['database']);
}
}
/**
* 推送任务到队列
*/
public function push(string $jobClass, array $data, int $delay = 0): string
{
$jobId = uniqid('job_', true);
$jobData = [
'id' => $jobId,
'class' => $jobClass,
'data' => $data,
'created_at' => time(),
'attempts' => 0,
'max_attempts' => $this->config['max_attempts'] ?? 3
];
$serialized = serialize($jobData);
if ($delay > 0) {
// 延迟队列使用有序集合
$score = time() + $delay;
$this->redis->zAdd("{$this->queueName}:delayed", $score, $serialized);
} else {
// 立即执行队列
$this->redis->lPush($this->queueName, $serialized);
}
return $jobId;
}
/**
* 从队列获取任务
*/
public function pop(): ?array
{
// 检查延迟队列
$this->moveDelayedJobs();
$serialized = $this->redis->rPop($this->queueName);
if (!$serialized) {
return null;
}
$jobData = unserialize($serialized);
if (!is_array($jobData)) {
throw new QueueException('任务数据反序列化失败');
}
return $jobData;
}
/**
* 将到期的延迟任务移到执行队列
*/
private function moveDelayedJobs(): void
{
$now = time();
$items = $this->redis->zRangeByScore(
"{$this->queueName}:delayed",
0,
$now
);
if (!empty($items)) {
$this->redis->multi();
foreach ($items as $item) {
$this->redis->lPush($this->queueName, $item);
$this->redis->zRem("{$this->queueName}:delayed", $item);
}
$this->redis->exec();
}
}
/**
* 获取队列长度
*/
public function size(): int
{
return $this->redis->lLen($this->queueName);
}
/**
* 获取延迟队列长度
*/
public function delayedSize(): int
{
return $this->redis->zCard("{$this->queueName}:delayed");
}
}
步骤3:任务基类与处理器
// src/Job/JobInterface.php
<?php
namespace AsyncQueueJob;
interface JobInterface
{
/**
* 执行任务
*/
public function handle(array $data): bool;
/**
* 任务失败时的处理
*/
public function failed(array $data, Throwable $exception): void;
}
// src/Job/AbstractJob.php
<?php
namespace AsyncQueueJob;
abstract class AbstractJob implements JobInterface
{
protected int $timeout = 60;
protected int $maxAttempts = 3;
public function getTimeout(): int
{
return $this->timeout;
}
public function getMaxAttempts(): int
{
return $this->maxAttempts;
}
}
// 示例任务:发送邮件
// src/Job/SendEmailJob.php
<?php
namespace AsyncQueueJob;
class SendEmailJob extends AbstractJob
{
public function handle(array $data): bool
{
// 模拟邮件发送
$to = $data['to'] ?? '';
$subject = $data['subject'] ?? '';
$body = $data['body'] ?? '';
if (empty($to)) {
throw new InvalidArgumentException('收件人邮箱不能为空');
}
// 实际发送邮件逻辑
// mail($to, $subject, $body);
sleep(2); // 模拟耗时操作
echo "邮件发送成功: {$to}n";
return true;
}
public function failed(array $data, Throwable $exception): void
{
// 记录失败日志或发送告警
error_log("邮件发送失败: " . $exception->getMessage());
// 可以在这里实现重试逻辑或通知管理员
$this->notifyAdmin($data, $exception);
}
private function notifyAdmin(array $data, Throwable $exception): void
{
// 通知管理员逻辑
}
}
步骤4:基于Swoole的Worker实现
// src/Worker/Worker.php
<?php
namespace AsyncQueueWorker;
use AsyncQueueCoreQueue;
use AsyncQueueJobJobInterface;
use SwooleProcess;
use SwooleTimer;
class Worker
{
private Queue $queue;
private array $config;
private bool $shouldStop = false;
private int $processedCount = 0;
private int $startTime;
public function __construct(Queue $queue, array $config = [])
{
$this->queue = $queue;
$this->config = array_merge([
'sleep' => 1,
'max_memory' => 128 * 1024 * 1024, // 128MB
'max_jobs' => 1000,
'time_limit' => 3600, // 1小时
], $config);
$this->startTime = time();
}
/**
* 启动Worker进程
*/
public function start(): void
{
$this->registerSignalHandlers();
echo "Worker启动,PID: " . getmypid() . "n";
echo "队列: " . $this->config['queue_name'] . "n";
while (!$this->shouldStop) {
$this->checkLimits();
$jobData = $this->queue->pop();
if ($jobData === null) {
// 队列为空,等待
sleep($this->config['sleep']);
continue;
}
$this->processJob($jobData);
$this->processedCount++;
// 每处理10个任务检查一次内存
if ($this->processedCount % 10 === 0) {
$this->checkMemory();
}
}
echo "Worker正常停止n";
}
/**
* 处理单个任务
*/
private function processJob(array $jobData): void
{
$jobId = $jobData['id'];
$jobClass = $jobData['class'];
$data = $jobData['data'];
$attempts = $jobData['attempts'] + 1;
echo "处理任务: {$jobId} (尝试: {$attempts})n";
try {
if (!class_exists($jobClass)) {
throw new RuntimeException("任务类不存在: {$jobClass}");
}
$job = new $jobClass();
if (!$job instanceof JobInterface) {
throw new RuntimeException("任务类必须实现JobInterface");
}
// 设置执行超时
$timeout = $job->getTimeout();
$result = $this->executeWithTimeout(
fn() => $job->handle($data),
$timeout
);
if ($result) {
echo "任务完成: {$jobId}n";
}
} catch (Throwable $e) {
echo "任务失败: {$jobId} - " . $e->getMessage() . "n";
// 重试逻辑
if ($attempts getMessage();
// 延迟重试:指数退避
$delay = min(300, pow(2, $attempts) * 5); // 最大5分钟
$this->queue->push($jobClass, $data, $delay);
echo "任务已安排重试,延迟 {$delay}秒n";
} else {
// 最终失败
if (isset($job) && $job instanceof JobInterface) {
$job->failed($data, $e);
}
echo "任务最终失败,已达最大重试次数n";
}
}
}
/**
* 带超时的任务执行
*/
private function executeWithTimeout(callable $callback, int $timeout): mixed
{
if (!extension_loaded('swoole')) {
return $callback();
}
$channel = new SwooleCoroutineChannel(1);
go(function () use ($channel, $callback) {
try {
$result = $callback();
$channel->push(['success' => true, 'result' => $result]);
} catch (Throwable $e) {
$channel->push(['success' => false, 'exception' => $e]);
}
});
$result = $channel->pop($timeout);
if ($result === false) {
throw new RuntimeException("任务执行超时 ({$timeout}秒)");
}
if (!$result['success']) {
throw $result['exception'];
}
return $result['result'];
}
/**
* 检查内存使用
*/
private function checkMemory(): void
{
$memory = memory_get_usage(true);
$maxMemory = $this->config['max_memory'];
if ($memory > $maxMemory) {
echo "内存使用过高: " . round($memory / 1024 / 1024, 2) . "MBn";
$this->shouldStop = true;
}
}
/**
* 检查运行限制
*/
private function checkLimits(): void
{
// 检查时间限制
if (time() - $this->startTime > $this->config['time_limit']) {
echo "达到时间限制,停止Workern";
$this->shouldStop = true;
}
// 检查处理任务数量限制
if ($this->processedCount >= $this->config['max_jobs']) {
echo "达到最大任务处理数,停止Workern";
$this->shouldStop = true;
}
}
/**
* 注册信号处理器
*/
private function registerSignalHandlers(): void
{
if (extension_loaded('pcntl')) {
pcntl_async_signals(true);
pcntl_signal(SIGTERM, fn() => $this->shouldStop = true);
pcntl_signal(SIGINT, fn() => $this->shouldStop = true);
}
}
}
步骤5:任务生产者与监控系统
// producer.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use AsyncQueueCoreQueue;
use AsyncQueueJobSendEmailJob;
$config = [
'host' => '127.0.0.1',
'port' => 6379,
'queue_name' => 'async_queue',
'max_attempts' => 3
];
$queue = new Queue($config);
// 生产邮件发送任务
$jobId = $queue->push(SendEmailJob::class, [
'to' => 'user@example.com',
'subject' => '欢迎注册',
'body' => '感谢您注册我们的服务!'
]);
echo "任务已创建: {$jobId}n";
// 生产延迟任务(5分钟后执行)
$delayedJobId = $queue->push(SendEmailJob::class, [
'to' => 'user@example.com',
'subject' => '提醒通知',
'body' => '这是您的提醒消息'
], 300); // 300秒 = 5分钟
echo "延迟任务已创建: {$delayedJobId}n";
// monitor.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use AsyncQueueCoreQueue;
class QueueMonitor
{
private Queue $queue;
public function __construct(Queue $queue)
{
$this->queue = $queue;
}
public function getStats(): array
{
return [
'pending' => $this->queue->size(),
'delayed' => $this->queue->delayedSize(),
'memory_usage' => round(memory_get_usage(true) / 1024 / 1024, 2) . 'MB',
'timestamp' => date('Y-m-d H:i:s')
];
}
public function runWebMonitor(int $port = 8080): void
{
if (!extension_loaded('swoole')) {
echo "需要Swoole扩展来运行Web监控n";
return;
}
$http = new SwooleHttpServer('0.0.0.0', $port);
$http->on('request', function ($request, $response) {
$stats = $this->getStats();
$response->header('Content-Type', 'application/json');
$response->end(json_encode([
'status' => 'success',
'data' => $stats
], JSON_PRETTY_PRINT));
});
echo "监控服务启动: http://0.0.0.0:{$port}n";
$http->start();
}
}
// 使用示例
$config = [
'host' => '127.0.0.1',
'port' => 6379,
'queue_name' => 'async_queue'
];
$queue = new Queue($config);
$monitor = new QueueMonitor($queue);
// 命令行查看状态
print_r($monitor->getStats());
// 启动Web监控(可选)
// $monitor->runWebMonitor();
步骤6:部署与进程管理
# worker.sh - Worker启动脚本
#!/bin/bash
BASE_DIR=$(cd "$(dirname "$0")"; pwd)
PID_FILE="$BASE_DIR/storage/worker.pid"
LOG_FILE="$BASE_DIR/logs/worker.log"
start() {
if [ -f "$PID_FILE" ]; then
PID=$(cat "$PID_FILE")
if kill -0 $PID 2>/dev/null; then
echo "Worker已经在运行 (PID: $PID)"
return 1
fi
fi
nohup php "$BASE_DIR/worker.php" >> "$LOG_FILE" 2>&1 &
PID=$!
echo $PID > "$PID_FILE"
echo "Worker启动成功 (PID: $PID)"
}
stop() {
if [ ! -f "$PID_FILE" ]; then
echo "Worker未运行"
return 1
fi
PID=$(cat "$PID_FILE")
if kill -0 $PID 2>/dev/null; then
kill -TERM $PID
rm "$PID_FILE"
echo "Worker已停止"
else
echo "Worker进程不存在,清理PID文件"
rm "$PID_FILE"
fi
}
case "$1" in
start)
start
;;
stop)
stop
;;
restart)
stop
sleep 2
start
;;
*)
echo "用法: $0 {start|stop|restart}"
exit 1
;;
esac
# worker.php - Worker入口文件
<?php
require_once __DIR__ . '/vendor/autoload.php';
use AsyncQueueCoreQueue;
use AsyncQueueWorkerWorker;
// 加载配置
$config = require __DIR__ . '/config/queue.php';
$queue = new Queue($config);
$worker = new Worker($queue, $config);
// 设置错误处理器
set_error_handler(function($errno, $errstr, $errfile, $errline) {
error_log("Worker错误: [$errno] $errstr in $errfile:$errline");
return true;
});
// 设置异常处理器
set_exception_handler(function($exception) {
error_log("Worker异常: " . $exception->getMessage());
exit(1);
});
// 启动Worker
$worker->start();
性能优化与高级特性
1. 连接池优化
// src/Core/ConnectionPool.php
<?php
namespace AsyncQueueCore;
use SwooleCoroutineChannel;
class ConnectionPool
{
private Channel $pool;
private int $size;
private string $host;
private int $port;
public function __construct(string $host, int $port, int $size = 10)
{
$this->host = $host;
$this->port = $port;
$this->size = $size;
$this->pool = new Channel($size);
$this->initPool();
}
private function initPool(): void
{
for ($i = 0; $i size; $i++) {
$redis = new Redis();
$redis->connect($this->host, $this->port);
$this->pool->push($redis);
}
}
public function get(): Redis
{
return $this->pool->pop();
}
public function put(Redis $redis): void
{
$this->pool->push($redis);
}
}
2. 批量任务处理
// src/Worker/BatchWorker.php
<?php
namespace AsyncQueueWorker;
class BatchWorker extends Worker
{
private int $batchSize = 10;
public function setBatchSize(int $size): void
{
$this->batchSize = max(1, $size);
}
protected function processBatch(): void
{
$jobs = [];
for ($i = 0; $i batchSize; $i++) {
$jobData = $this->queue->pop();
if ($jobData === null) {
break;
}
$jobs[] = $jobData;
}
if (empty($jobs)) {
return;
}
// 并行处理批量任务
$this->processJobsParallel($jobs);
}
private function processJobsParallel(array $jobs): void
{
$channels = [];
foreach ($jobs as $index => $jobData) {
$channel = new SwooleCoroutineChannel(1);
$channels[$index] = $channel;
go(function () use ($channel, $jobData) {
try {
$result = $this->processSingleJob($jobData);
$channel->push(['success' => true, 'result' => $result]);
} catch (Throwable $e) {
$channel->push(['success' => false, 'exception' => $e]);
}
});
}
// 收集结果
foreach ($channels as $channel) {
$result = $channel->pop();
// 处理结果...
}
}
}
测试与验证
单元测试示例
// tests/QueueTest.php
<?php
use PHPUnitFrameworkTestCase;
use AsyncQueueCoreQueue;
class QueueTest extends TestCase
{
private Queue $queue;
protected function setUp(): void
{
$config = [
'host' => getenv('REDIS_HOST') ?: '127.0.0.1',
'port' => getenv('REDIS_PORT') ?: 6379,
'queue_name' => 'test_queue_' . uniqid()
];
$this->queue = new Queue($config);
}
public function testPushAndPop(): void
{
$jobId = $this->queue->push('TestJob', ['data' => 'test']);
$this->assertNotEmpty($jobId);
$jobData = $this->queue->pop();
$this->assertIsArray($jobData);
$this->assertEquals($jobId, $jobData['id']);
$this->assertEquals('TestJob', $jobData['class']);
}
public function testDelayedJob(): void
{
$jobId = $this->queue->push('TestJob', ['data' => 'delayed'], 1);
// 立即获取应该为空
$jobData = $this->queue->pop();
$this->assertNull($jobData);
// 等待1秒后应该能获取到
sleep(2);
$jobData = $this->queue->pop();
$this->assertNotNull($jobData);
$this->assertEquals($jobId, $jobData['id']);
}
protected function tearDown(): void
{
// 清理测试数据
}
}
生产环境部署建议
- 高可用配置:使用Redis Sentinel或Cluster
- 监控告警:集成Prometheus + Grafana
- 日志收集:使用ELK Stack或Loki
- 进程管理:使用Supervisor或systemd
- 安全加固:Redis密码认证、网络隔离
Supervisor配置示例
# /etc/supervisor/conf.d/async_queue.conf
[program:async_queue_worker]
command=php /var/www/async-queue/worker.php
directory=/var/www/async-queue
user=www-data
autostart=true
autorestart=true
startretries=3
numprocs=4
process_name=%(program_name)s_%(process_num)02d
stdout_logfile=/var/log/async_queue/worker.log
stderr_logfile=/var/log/async_queue/worker.error.log
总结与扩展
通过本教程,我们实现了一个完整的PHP异步任务队列系统,具备以下特性:
- 基于Redis的高性能队列存储
- Swoole协程支持的高并发处理
- 延迟任务和重试机制
- 完善的监控和管理功能
- 生产级别的部署方案
扩展方向
- 添加优先级队列支持
- 实现任务依赖关系
- 集成消息中间件(RabbitMQ/Kafka)
- 添加Web管理界面
- 支持分布式Worker调度
这个系统可以广泛应用于电商、社交、物联网等需要异步处理的场景,显著提升系统吞吐量和用户体验。

