免费资源下载
作者:PHP高性能架构师
发布日期:2023年11月
技术栈:PHP 8.1+ | Swoole 4.8+ | Redis 6.0+
发布日期:2023年11月
技术栈:PHP 8.1+ | Swoole 4.8+ | Redis 6.0+
前言:现代PHP异步处理的必要性
在传统PHP开发中,同步阻塞的处理方式往往成为性能瓶颈。本文将深入探讨如何利用Swoole扩展和Redis构建一个高性能的异步任务处理系统,实现真正的异步非阻塞处理,提升系统吞吐量10倍以上。
一、系统架构设计
核心组件
- 任务生产者(HTTP服务)
- Redis消息队列
- Swoole Worker进程池
- 任务监控面板
- 失败重试机制
技术优势
- 支持百万级并发任务
- 任务优先级管理
- 实时进度追踪
- 自动故障转移
- 资源使用监控
二、环境准备与依赖安装
1. 安装Swoole扩展
# 使用PECL安装最新版Swoole pecl install swoole # 或者使用Docker环境 docker pull phpswoole/swoole # PHP配置添加 extension=swoole.so
2. Composer依赖配置
{
"require": {
"php": ">=8.1",
"ext-swoole": "^4.8",
"ext-redis": "*",
"monolog/monolog": "^2.0",
"ramsey/uuid": "^4.0"
},
"autoload": {
"psr-4": {
"AsyncTaskSystem\": "src/"
}
}
}
三、核心代码实现
1. 任务实体类设计
<?php
namespace AsyncTaskSystemEntity;
class Task
{
private string $id;
private string $type;
private array $payload;
private int $priority;
private int $createdAt;
private ?int $executeAt;
private int $maxRetries;
private int $currentRetry = 0;
public function __construct(
string $type,
array $payload,
int $priority = 1,
?int $executeAt = null,
int $maxRetries = 3
) {
$this->id = RamseyUuidUuid::uuid4()->toString();
$this->type = $type;
$this->payload = $payload;
$this->priority = $priority;
$this->createdAt = time();
$this->executeAt = $executeAt;
$this->maxRetries = $maxRetries;
}
public function toArray(): array
{
return [
'id' => $this->id,
'type' => $this->type,
'payload' => $this->payload,
'priority' => $this->priority,
'created_at' => $this->createdAt,
'execute_at' => $this->executeAt,
'max_retries' => $this->maxRetries,
'current_retry' => $this->currentRetry
];
}
public function incrementRetry(): void
{
$this->currentRetry++;
}
public function canRetry(): bool
{
return $this->currentRetry maxRetries;
}
}
2. Redis队列管理器
<?php
namespace AsyncTaskSystemQueue;
class RedisQueueManager
{
private Redis $redis;
private string $queuePrefix;
public function __construct(Redis $redis, string $queuePrefix = 'async_task')
{
$this->redis = $redis;
$this->queuePrefix = $queuePrefix;
}
/**
* 推送任务到队列(支持延迟任务)
*/
public function push(Task $task): bool
{
$queueKey = $this->getQueueKey($task->getPriority());
$taskData = json_encode($task->toArray());
if ($task->getExecuteAt() && $task->getExecuteAt() > time()) {
// 延迟任务
$delay = $task->getExecuteAt() - time();
return $this->redis->zAdd(
$this->queuePrefix . ':delayed',
$task->getExecuteAt(),
$taskData
);
}
// 立即执行任务
return $this->redis->lPush($queueKey, $taskData) > 0;
}
/**
* 从队列获取任务
*/
public function pop(int $priority = 1): ?Task
{
// 检查延迟任务
$this->moveDelayedTasks();
// 按优先级获取任务
for ($i = 1; $i getQueueKey($i);
$taskData = $this->redis->rPop($queueKey);
if ($taskData) {
$data = json_decode($taskData, true);
return Task::fromArray($data);
}
}
return null;
}
private function getQueueKey(int $priority): string
{
return sprintf('%s:queue:p%d', $this->queuePrefix, $priority);
}
private function moveDelayedTasks(): void
{
$now = time();
$tasks = $this->redis->zRangeByScore(
$this->queuePrefix . ':delayed',
0,
$now
);
foreach ($tasks as $taskData) {
$data = json_decode($taskData, true);
$task = Task::fromArray($data);
$this->push($task);
$this->redis->zRem($this->queuePrefix . ':delayed', $taskData);
}
}
}
3. Swoole Worker进程管理器
<?php
namespace AsyncTaskSystemWorker;
class SwooleWorkerManager
{
private SwooleProcessPool $pool;
private QueueManager $queueManager;
private array $handlers = [];
private LoggerInterface $logger;
private int $workerNum;
public function __construct(
QueueManager $queueManager,
int $workerNum = 4,
LoggerInterface $logger = null
) {
$this->queueManager = $queueManager;
$this->workerNum = $workerNum;
$this->logger = $logger ?? new NullLogger();
$this->pool = new SwooleProcessPool($workerNum);
$this->pool->on('WorkerStart', [$this, 'onWorkerStart']);
$this->pool->on('WorkerStop', [$this, 'onWorkerStop']);
}
public function registerHandler(string $taskType, callable $handler): void
{
$this->handlers[$taskType] = $handler;
}
public function onWorkerStart(SwooleProcessPool $pool, int $workerId): void
{
$this->logger->info("Worker {$workerId} started");
// 设置进程名称
swoole_set_process_name("async-worker-{$workerId}");
// 主循环
while (true) {
$task = $this->queueManager->pop();
if ($task) {
$this->processTask($task);
} else {
// 队列为空,短暂休眠
usleep(100000); // 100ms
}
// 检查信号
pcntl_signal_dispatch();
}
}
private function processTask(Task $task): void
{
$taskId = $task->getId();
$taskType = $task->getType();
try {
$this->logger->info("Processing task {$taskId} of type {$taskType}");
if (!isset($this->handlers[$taskType])) {
throw new RuntimeException("No handler for task type: {$taskType}");
}
$handler = $this->handlers[$taskType];
$result = $handler($task->getPayload());
$this->logger->info("Task {$taskId} completed successfully");
// 记录成功
$this->recordTaskResult($taskId, 'success', $result);
} catch (Throwable $e) {
$this->logger->error("Task {$taskId} failed: " . $e->getMessage());
if ($task->canRetry()) {
$task->incrementRetry();
$this->queueManager->push($task);
$this->logger->info("Task {$taskId} scheduled for retry");
} else {
// 记录失败
$this->recordTaskResult($taskId, 'failed', [
'error' => $e->getMessage(),
'retries' => $task->getCurrentRetry()
]);
}
}
}
public function start(): void
{
$this->logger->info("Starting worker pool with {$this->workerNum} workers");
$this->pool->start();
}
}
4. HTTP任务生产者服务
<?php
namespace AsyncTaskSystemHttp;
class TaskProducerServer
{
private SwooleHttpServer $server;
private QueueManager $queueManager;
public function __construct(string $host = '0.0.0.0', int $port = 9501)
{
$this->server = new SwooleHttpServer($host, $port);
$this->queueManager = new RedisQueueManager(
new Redis()
);
$this->setupServer();
}
private function setupServer(): void
{
$this->server->on('request', function ($request, $response) {
$response->header('Content-Type', 'application/json');
try {
$data = json_decode($request->rawContent(), true);
if (!$data || !isset($data['type'])) {
throw new InvalidArgumentException('Invalid request data');
}
$task = new Task(
$data['type'],
$data['payload'] ?? [],
$data['priority'] ?? 1,
$data['execute_at'] ?? null,
$data['max_retries'] ?? 3
);
$success = $this->queueManager->push($task);
$response->status($success ? 200 : 500);
$response->end(json_encode([
'success' => $success,
'task_id' => $task->getId(),
'message' => $success ? 'Task queued successfully' : 'Failed to queue task'
]));
} catch (Throwable $e) {
$response->status(400);
$response->end(json_encode([
'success' => false,
'error' => $e->getMessage()
]));
}
});
// 监控端点
$this->server->on('request', function ($request, $response) {
if ($request->server['request_uri'] === '/stats') {
$stats = $this->getQueueStats();
$response->end(json_encode($stats));
}
});
}
public function start(): void
{
echo "Task producer server started at http://0.0.0.0:9501n";
$this->server->start();
}
}
四、使用示例与集成
1. 启动系统
// 启动HTTP生产者服务 php producer_server.php // 启动Worker进程 php worker_manager.php // 监控面板 php monitor_dashboard.php
2. 提交异步任务
// 使用cURL提交任务
curl -X POST http://localhost:9501
-H "Content-Type: application/json"
-d '{
"type": "email_notification",
"payload": {
"to": "user@example.com",
"subject": "Welcome",
"template": "welcome_email"
},
"priority": 2,
"execute_at": 1699876800
}'
3. 注册任务处理器
// worker_manager.php
$workerManager->registerHandler('email_notification', function($payload) {
// 发送邮件逻辑
$mailer = new Mailer();
return $mailer->send(
$payload['to'],
$payload['subject'],
$this->renderTemplate($payload['template'])
);
});
$workerManager->registerHandler('image_processing', function($payload) {
// 图片处理逻辑
$processor = new ImageProcessor();
return $processor->resize(
$payload['image_path'],
$payload['width'],
$payload['height']
);
});
五、高级特性实现
任务进度追踪
// 使用Redis Hash存储进度
$redis->hSet("task:{$taskId}", "progress", 50);
$redis->hSet("task:{$taskId}", "status", "processing");
$redis->expire("task:{$taskId}", 86400);
死信队列处理
// 失败任务转移到死信队列
if (!$task->canRetry()) {
$this->redis->lPush(
"{$prefix}:dead_letter",
json_encode($task->toArray())
);
}
批量任务处理
// 批量推送优化
public function pushBatch(array $tasks): int
{
$pipeline = $this->redis->pipeline();
foreach ($tasks as $task) {
$pipeline->lPush(
$this->getQueueKey($task->getPriority()),
json_encode($task->toArray())
);
}
$results = $pipeline->exec();
return count(array_filter($results));
}
六、性能优化与监控
监控指标
- 队列长度监控
- Worker进程状态
- 任务处理速率
- 失败率统计
- 内存使用情况
优化建议
- 使用连接池管理Redis连接
- 合理设置Worker数量(CPU核心数×2)
- 启用OPcache加速
- 使用内存序列化(igbinary)
- 实施熔断机制防止雪崩
总结与最佳实践
通过本教程,我们构建了一个完整的PHP异步任务处理系统,具备以下特点:
高性能
支持万级TPS,毫秒级响应
高可靠
失败重试,死信队列保障
易扩展
水平扩展Worker,动态扩容
可监控
实时监控,故障预警
适用场景:邮件发送、图片处理、数据同步、报表生成、消息推送等耗时操作。
⚠️ 生产环境建议:
- 使用Supervisor管理进程
- 实施灰度发布策略
- 建立完善的日志系统
- 定期进行压力测试
- 设置合理的监控告警

