原创作者:PHP技术栈 | 发布日期:2023年10月
一、异步处理的核心价值
在传统PHP开发中,同步阻塞式处理往往成为性能瓶颈。当用户请求涉及耗时操作(如邮件发送、图片处理、数据报表生成)时,用户需要等待所有操作完成才能获得响应。基于Swoole扩展的异步队列系统能够将耗时任务解耦,实现请求的即时响应和后台任务的并行处理。
技术选型对比
| 方案 | 优点 | 缺点 |
|---|---|---|
| 传统同步处理 | 开发简单,逻辑直观 | 响应延迟,资源占用高 |
| 数据库队列 | 数据持久化,实现简单 | 性能瓶颈,IO压力大 |
| Swoole内存队列 | 高性能,低延迟,资源利用率高 | 需要安装扩展,学习曲线较陡 |
二、系统架构设计
我们设计一个三层架构的队列系统:
2.1 架构组件
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 任务生产者 │───▶│ 内存队列服务 │───▶│ 任务消费者 │
│ (Web应用) │ │ (Swoole Server)│ │ (Worker进程) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ HTTP API接口 │ │ 任务持久化存储 │ │ 结果回调通知 │
└─────────────────┘ └─────────────────┘ └─────────────────┘
2.2 数据流转流程
- 生产者通过HTTP API提交任务到队列服务器
- Swoole Server接收任务并存入内存队列
- Worker进程从队列获取任务并执行
- 执行结果可选持久化到Redis/MySQL
- 通过Webhook回调通知业务系统
三、完整代码实现
3.1 环境准备
# 安装Swoole扩展
pecl install swoole
# 验证安装
php -m | grep swoole
# 创建项目目录结构
mkdir -p async-queue/{config,src/Queue,src/Worker,logs}
3.2 队列服务器核心代码
<?php
// src/Queue/QueueServer.php
declare(strict_types=1);
namespace AsyncQueueQueue;
use SwooleProcess;
use SwooleTable;
class QueueServer
{
private $server;
private $taskTable;
private $config = [
'worker_num' => 4,
'task_worker_num' => 8,
'max_request' => 1000,
'dispatch_mode' => 3,
];
public function __construct(string $host = '0.0.0.0', int $port = 9501)
{
// 创建内存表存储任务状态
$this->createTaskTable();
$this->server = new SwooleHttpServer($host, $port);
$this->server->set($this->config);
$this->registerCallbacks();
}
private function createTaskTable(): void
{
$this->taskTable = new Table(1024);
$this->taskTable->column('task_id', Table::TYPE_STRING, 32);
$this->taskTable->column('status', Table::TYPE_INT, 1);
$this->taskTable->column('create_time', Table::TYPE_INT, 10);
$this->taskTable->column('data', Table::TYPE_STRING, 2048);
$this->taskTable->create();
}
private function registerCallbacks(): void
{
$this->server->on('request', function ($request, $response) {
$this->handleRequest($request, $response);
});
$this->server->on('task', function ($server, $taskId, $workerId, $data) {
return $this->handleTask($data);
});
$this->server->on('finish', function ($server, $taskId, $data) {
$this->handleTaskFinish($taskId, $data);
});
}
private function handleRequest($request, $response): void
{
$path = $request->server['request_uri'] ?? '';
switch ($path) {
case '/task/submit':
$this->submitTask($request, $response);
break;
case '/task/status':
$this->getTaskStatus($request, $response);
break;
default:
$response->status(404);
$response->end(json_encode(['error' => 'Not Found']));
}
}
private function submitTask($request, $response): void
{
$data = json_decode($request->rawContent(), true) ?? [];
if (empty($data['type']) || empty($data['payload'])) {
$response->status(400);
$response->end(json_encode(['error' => 'Invalid task data']));
return;
}
$taskId = uniqid('task_', true);
$taskData = [
'task_id' => $taskId,
'type' => $data['type'],
'payload' => $data['payload'],
'callback_url' => $data['callback_url'] ?? null,
];
// 存储到内存表
$this->taskTable->set($taskId, [
'task_id' => $taskId,
'status' => 0, // 0: pending, 1: processing, 2: completed
'create_time' => time(),
'data' => json_encode($taskData),
]);
// 投递异步任务
$taskId = $this->server->task(json_encode($taskData));
$response->header('Content-Type', 'application/json');
$response->end(json_encode([
'success' => true,
'task_id' => $taskId,
'message' => 'Task submitted successfully'
]));
}
private function handleTask(string $data): string
{
$task = json_decode($data, true);
// 更新任务状态为处理中
$this->taskTable->set($task['task_id'], ['status' => 1], true);
// 模拟任务处理
$result = $this->processTask($task);
return json_encode([
'task_id' => $task['task_id'],
'result' => $result,
'processed_at' => time()
]);
}
private function processTask(array $task): array
{
// 根据任务类型执行不同的处理逻辑
switch ($task['type']) {
case 'email':
return $this->sendEmail($task['payload']);
case 'image':
return $this->processImage($task['payload']);
case 'report':
return $this->generateReport($task['payload']);
default:
return ['error' => 'Unknown task type'];
}
}
private function sendEmail(array $payload): array
{
// 实际项目中替换为真实的邮件发送逻辑
sleep(2); // 模拟耗时操作
return [
'success' => true,
'message' => "Email sent to {$payload['to']}"
];
}
public function start(): void
{
echo "Queue server starting on 0.0.0.0:9501n";
$this->server->start();
}
}
// 启动脚本
$server = new QueueServer();
$server->start();
3.3 任务生产者示例
<?php
// producer.php
class TaskProducer
{
private $serverUrl;
public function __construct(string $serverUrl = 'http://localhost:9501')
{
$this->serverUrl = $serverUrl;
}
public function submitEmailTask(array $emailData): array
{
$task = [
'type' => 'email',
'payload' => [
'to' => $emailData['to'],
'subject' => $emailData['subject'],
'body' => $emailData['body'],
],
'callback_url' => $emailData['callback_url'] ?? null,
];
return $this->sendToQueue($task);
}
private function sendToQueue(array $task): array
{
$ch = curl_init($this->serverUrl . '/task/submit');
curl_setopt_array($ch, [
CURLOPT_RETURNTRANSFER => true,
CURLOPT_POST => true,
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
CURLOPT_POSTFIELDS => json_encode($task),
CURLOPT_TIMEOUT => 5,
]);
$response = curl_exec($ch);
curl_close($ch);
return json_decode($response, true) ?? [];
}
}
// 使用示例
$producer = new TaskProducer();
$result = $producer->submitEmailTask([
'to' => 'user@example.com',
'subject' => '欢迎邮件',
'body' => '感谢您注册我们的服务',
]);
if ($result['success'] ?? false) {
echo "任务提交成功,任务ID: " . $result['task_id'];
} else {
echo "任务提交失败";
}
3.4 监控脚本实现
<?php
// monitor.php
class QueueMonitor
{
private $serverUrl;
public function __construct(string $serverUrl = 'http://localhost:9501')
{
$this->serverUrl = $serverUrl;
}
public function getStats(): array
{
// 获取Swoole服务器状态
$ch = curl_init($this->serverUrl . '/stats');
curl_setopt_array($ch, [
CURLOPT_RETURNTRANSFER => true,
CURLOPT_TIMEOUT => 3,
]);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($httpCode !== 200) {
return ['error' => 'Unable to connect to queue server'];
}
return json_decode($response, true) ?? [];
}
public function printDashboard(): void
{
$stats = $this->getStats();
echo "========== 队列监控面板 ==========n";
echo "时间: " . date('Y-m-d H:i:s') . "n";
echo "服务器状态: " . ($stats['server']['running'] ? '运行中' : '停止') . "n";
echo "工作进程: {$stats['workers']['active']}/{$stats['workers']['total']}n";
echo "待处理任务: {$stats['queue']['pending']}n";
echo "处理中任务: {$stats['queue']['processing']}n";
echo "已完成任务: {$stats['queue']['completed']}n";
echo "内存使用: " . round($stats['memory']['usage'] / 1024 / 1024, 2) . " MBn";
echo "=================================n";
}
}
// 定时监控
$monitor = new QueueMonitor();
while (true) {
system('clear'); // Linux/Mac
// system('cls'); // Windows
$monitor->printDashboard();
sleep(5);
}
四、部署与监控
4.1 生产环境部署
# 使用Supervisor管理进程
[program:async_queue]
command=/usr/bin/php /path/to/QueueServer.php
directory=/path/to/project
autostart=true
autorestart=true
user=www-data
numprocs=1
redirect_stderr=true
stdout_logfile=/var/log/async_queue.log
4.2 Nginx反向代理配置
server {
listen 80;
server_name queue.yourdomain.com;
location / {
proxy_pass http://127.0.0.1:9501;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}
五、性能优化策略
5.1 内存优化
- 连接池管理:为数据库、Redis连接建立连接池,避免频繁创建销毁连接
- 内存表分片:当任务量巨大时,采用多张Table分片存储
- 数据压缩:对存储的JSON数据进行gzip压缩
5.2 故障恢复机制
<?php
class TaskRecovery
{
public function recoverFromCrash(): void
{
// 从Redis或MySQL中恢复未完成的任务
$pendingTasks = $this->getPendingTasksFromStorage();
foreach ($pendingTasks as $task) {
if ($this->isTaskExpired($task)) {
$this->markTaskAsFailed($task);
} else {
$this->resubmitTask($task);
}
}
}
private function getPendingTasksFromStorage(): array
{
// 从持久化存储获取崩溃时未完成的任务
// 实现取决于使用的存储引擎
return [];
}
}
5.3 扩展方案
- 优先级队列:实现不同优先级的任务处理
- 延迟任务:支持定时执行的任务调度
- 任务依赖:实现任务之间的依赖关系处理
- 分布式部署:多节点集群部署,提高可用性

