发布日期:2023年11月
技术栈:PHP 8.1+、Swoole 4.8+、Redis、MySQL
引言:为什么PHP需要异步任务处理?
在传统的PHP开发中,同步阻塞式的编程模型在处理耗时任务时面临严重性能瓶颈。本文将带你使用Swoole扩展,构建一个完整的异步任务队列系统,实现真正的非阻塞并发处理。
第一部分:环境准备与架构设计
1.1 系统架构概览
任务队列系统架构:
├── 任务生产者(Web应用)
├── 消息中间件(Redis Stream)
├── 任务消费者(Swoole Worker进程)
├── 结果存储器(MySQL + Redis)
└── 监控面板(WebSocket实时监控)
1.2 环境要求与安装
# 安装Swoole扩展
pecl install swoole
# 或者使用Docker环境
docker run -it --name php-swoole
-v $(pwd):/app
phpswoole/swoole:php8.1
# 验证安装
php -m | grep swoole
第二部分:核心组件实现
2.1 任务消息结构设计
<?php
// src/Task/Message.php
namespace AppTask;
class TaskMessage
{
private string $id;
private string $type;
private array $data;
private int $priority;
private int $createTime;
private int $retryCount = 0;
private ?int $delay = null;
public function __construct(
string $type,
array $data,
int $priority = 1,
?int $delay = null
) {
$this->id = uniqid('task_', true);
$this->type = $type;
$this->data = $data;
$this->priority = $priority;
$this->createTime = time();
$this->delay = $delay;
}
public function toArray(): array
{
return [
'id' => $this->id,
'type' => $this->type,
'data' => $this->data,
'priority' => $this->priority,
'create_time' => $this->createTime,
'retry_count' => $this->retryCount,
'delay' => $this->delay
];
}
public function encode(): string
{
return json_encode($this->toArray(), JSON_UNESCAPED_UNICODE);
}
public static function decode(string $json): self
{
$data = json_decode($json, true);
$message = new self($data['type'], $data['data']);
$message->id = $data['id'];
$message->retryCount = $data['retry_count'] ?? 0;
return $message;
}
}
2.2 Redis队列管理器
<?php
// src/Queue/RedisQueue.php
namespace AppQueue;
use Redis;
use AppTaskTaskMessage;
class RedisQueue
{
private Redis $redis;
private string $streamKey = 'task_stream';
private string $pendingKey = 'task_pending';
private string $failedKey = 'task_failed';
public function __construct(array $config)
{
$this->redis = new Redis();
$this->redis->connect(
$config['host'],
$config['port'],
$config['timeout'] ?? 2.0
);
if (!empty($config['password'])) {
$this->redis->auth($config['password']);
}
$this->redis->select($config['database'] ?? 0);
}
/**
* 投递任务到队列
*/
public function push(TaskMessage $message): string
{
$messageId = $this->redis->xAdd(
$this->streamKey,
'*',
['message' => $message->encode()],
$message->getDelay() * 1000 ?? 0
);
// 记录任务元数据
$this->redis->hSet(
'task_meta',
$message->getId(),
json_encode([
'status' => 'pending',
'push_time' => time()
])
);
return $messageId;
}
/**
* 消费任务
*/
public function pop(string $consumerGroup, string $consumerName): ?array
{
// 检查并创建消费者组
if (!$this->redis->xInfo('GROUPS', $this->streamKey)) {
$this->redis->xGroup(
'CREATE',
$this->streamKey,
$consumerGroup,
'0',
true
);
}
// 从消费者组读取消息
$messages = $this->redis->xReadGroup(
$consumerGroup,
$consumerName,
[$this->streamKey => '>'],
1,
5000 // 阻塞5秒
);
if (empty($messages)) {
return null;
}
$streamMessages = current($messages);
$messageData = current($streamMessages);
return [
'id' => key($streamMessages),
'message' => TaskMessage::decode($messageData['message'])
];
}
/**
* 确认任务完成
*/
public function ack(string $consumerGroup, string $messageId): bool
{
return $this->redis->xAck(
$this->streamKey,
$consumerGroup,
[$messageId]
) > 0;
}
/**
* 任务重试
*/
public function retry(string $messageId, TaskMessage $message): bool
{
$message->incrementRetry();
if ($message->getRetryCount() > 3) {
// 超过重试次数,移到失败队列
return $this->redis->lPush(
$this->failedKey,
$message->encode()
) > 0;
}
// 重新投递到延迟队列
$message->setDelay(60 * $message->getRetryCount()); // 指数退避
return $this->push($message) !== false;
}
}
第三部分:Swoole Worker实现
3.1 多进程任务消费者
<?php
// src/Worker/TaskWorker.php
namespace AppWorker;
use SwooleProcess;
use SwooleTimer;
use AppQueueRedisQueue;
use AppTaskTaskHandler;
class TaskWorker
{
private RedisQueue $queue;
private TaskHandler $handler;
private array $config;
private array $processes = [];
private bool $running = true;
public function __construct(array $config)
{
$this->config = $config;
$this->queue = new RedisQueue($config['redis']);
$this->handler = new TaskHandler();
}
/**
* 启动Worker进程池
*/
public function start(int $workerCount = 4): void
{
echo "启动 {$workerCount} 个Worker进程...n";
// 创建Worker进程
for ($i = 0; $i runWorker($worker, $i);
});
$process->start();
$this->processes[] = $process;
}
// 主进程监控
$this->monitorProcesses();
}
/**
* Worker进程主循环
*/
private function runWorker(Process $worker, int $workerId): void
{
echo "Worker {$workerId} 启动,PID: " . getmypid() . "n";
$consumerName = "worker_{$workerId}_" . uniqid();
while ($this->running) {
try {
// 从队列获取任务
$task = $this->queue->pop(
$this->config['consumer_group'],
$consumerName
);
if (!$task) {
usleep(100000); // 100ms
continue;
}
[$messageId, $message] = $task;
echo "Worker {$workerId} 处理任务: {$message->getId()}n";
// 处理任务
$result = $this->handler->handle($message);
if ($result) {
// 确认任务完成
$this->queue->ack(
$this->config['consumer_group'],
$messageId
);
echo "Worker {$workerId} 完成任务: {$message->getId()}n";
} else {
// 任务失败,重试
$this->queue->retry($messageId, $message);
echo "Worker {$workerId} 任务失败,已重试: {$message->getId()}n";
}
} catch (Throwable $e) {
error_log("Worker {$workerId} 错误: " . $e->getMessage());
usleep(500000); // 500ms
}
}
}
/**
* 进程监控
*/
private function monitorProcesses(): void
{
// 信号处理
Process::signal(SIGTERM, function () {
$this->running = false;
$this->stop();
});
Process::signal(SIGCHLD, function () {
while ($ret = Process::wait(false)) {
echo "进程 {$ret['pid']} 退出n";
}
});
// 心跳检测
Timer::tick(30000, function () {
$this->checkWorkerHealth();
});
}
/**
* 健康检查
*/
private function checkWorkerHealth(): void
{
foreach ($this->processes as $pid => $process) {
if (!$process->isRunning()) {
echo "Worker {$pid} 异常退出,重新启动...n";
// 重新启动进程逻辑
}
}
}
/**
* 停止所有Worker
*/
public function stop(): void
{
$this->running = false;
foreach ($this->processes as $process) {
if ($process->isRunning()) {
$process->kill(SIGTERM);
}
}
echo "所有Worker进程已停止n";
}
}
3.2 任务处理器示例
<?php
// src/Task/TaskHandler.php
namespace AppTask;
class TaskHandler
{
/**
* 处理不同类型的任务
*/
public function handle(TaskMessage $message): bool
{
$method = 'handle' . ucfirst($message->getType());
if (method_exists($this, $method)) {
return $this->$method($message->getData());
}
return $this->handleDefault($message->getData());
}
/**
* 邮件发送任务
*/
private function handleEmail(array $data): bool
{
try {
// 模拟邮件发送
$mailer = new AppServiceMailer();
$result = $mailer->send(
$data['to'],
$data['subject'],
$data['content'],
$data['attachments'] ?? []
);
// 记录发送日志
$this->logEmail($data, $result);
return $result;
} catch (Exception $e) {
error_log("邮件发送失败: " . $e->getMessage());
return false;
}
}
/**
* 图片处理任务
*/
private function handleImageProcess(array $data): bool
{
$imagePath = $data['path'];
$operations = $data['operations'];
try {
$imagick = new Imagick($imagePath);
foreach ($operations as $op) {
switch ($op['type']) {
case 'resize':
$imagick->resizeImage(
$op['width'],
$op['height'],
Imagick::FILTER_LANCZOS,
1
);
break;
case 'crop':
$imagick->cropImage(
$op['width'],
$op['height'],
$op['x'],
$op['y']
);
break;
case 'watermark':
$this->addWatermark($imagick, $op);
break;
}
}
// 保存处理后的图片
$outputPath = $data['output_path'];
$imagick->writeImage($outputPath);
$imagick->destroy();
return true;
} catch (Exception $e) {
error_log("图片处理失败: " . $e->getMessage());
return false;
}
}
/**
* 数据导出任务
*/
private function handleExport(array $data): bool
{
$exportType = $data['type'];
$filters = $data['filters'] ?? [];
// 使用生成器处理大数据量
$generator = $this->getExportData($filters);
$exporter = AppServiceExporterFactory::create($exportType);
foreach ($generator as $batch) {
if (!$exporter->writeBatch($batch)) {
return false;
}
// 更新进度
$this->updateProgress($data['task_id'], $exporter->getProgress());
}
return $exporter->finalize();
}
/**
* 默认处理器
*/
private function handleDefault(array $data): bool
{
// 自定义任务处理逻辑
return true;
}
/**
* 获取导出数据(生成器模式)
*/
private function getExportData(array $filters): Generator
{
$page = 1;
$pageSize = 1000;
do {
$data = $this->fetchData($filters, $page, $pageSize);
if (empty($data)) {
break;
}
yield $data;
$page++;
} while (count($data) === $pageSize);
}
}
第四部分:Web管理界面与监控
4.1 WebSocket实时监控
<?php
// src/WebSocket/MonitorServer.php
namespace AppWebSocket;
use SwooleWebSocketServer;
use SwooleTable;
class MonitorServer
{
private Server $server;
private Table $clientTable;
public function __construct(string $host = '0.0.0.0', int $port = 9502)
{
$this->server = new Server($host, $port);
// 创建内存表存储客户端信息
$this->clientTable = new Table(1024);
$this->clientTable->column('fd', Table::TYPE_INT);
$this->clientTable->column('last_heartbeat', Table::TYPE_INT);
$this->clientTable->create();
$this->setupCallbacks();
}
private function setupCallbacks(): void
{
$this->server->on('open', function (Server $server, $request) {
$this->clientTable->set($request->fd, [
'fd' => $request->fd,
'last_heartbeat' => time()
]);
echo "客户端 {$request->fd} 连接n";
// 发送当前系统状态
$server->push($request->fd, json_encode([
'type' => 'system_status',
'data' => $this->getSystemStatus()
]));
});
$this->server->on('message', function (Server $server, $frame) {
$data = json_decode($frame->data, true);
switch ($data['type'] ?? '') {
case 'heartbeat':
$this->updateHeartbeat($frame->fd);
break;
case 'subscribe':
$this->handleSubscribe($frame->fd, $data['channels']);
break;
}
});
$this->server->on('close', function (Server $server, $fd) {
$this->clientTable->del($fd);
echo "客户端 {$fd} 断开连接n";
});
}
/**
* 广播任务状态更新
*/
public function broadcastTaskUpdate(array $taskData): void
{
$message = json_encode([
'type' => 'task_update',
'data' => $taskData,
'timestamp' => time()
]);
foreach ($this->clientTable as $row) {
$this->server->push($row['fd'], $message);
}
}
/**
* 获取系统状态
*/
private function getSystemStatus(): array
{
return [
'worker_count' => count($this->processes),
'queue_size' => $this->getQueueSize(),
'processed_today' => $this->getProcessedCount(),
'failed_today' => $this->getFailedCount(),
'system_load' => sys_getloadavg()[0]
];
}
public function start(): void
{
echo "WebSocket监控服务器启动在 0.0.0.0:9502n";
$this->server->start();
}
}
4.2 任务管理API
<?php
// src/Controller/TaskController.php
namespace AppController;
use SwooleHttpRequest;
use SwooleHttpResponse;
use AppQueueRedisQueue;
use AppTaskTaskMessage;
class TaskController
{
private RedisQueue $queue;
public function __construct()
{
$this->queue = new RedisQueue([
'host' => '127.0.0.1',
'port' => 6379
]);
}
/**
* 提交新任务
*/
public function submit(Request $request, Response $response): void
{
$data = json_decode($request->rawContent(), true);
$message = new TaskMessage(
$data['type'],
$data['data'],
$data['priority'] ?? 1,
$data['delay'] ?? null
);
$taskId = $this->queue->push($message);
$response->header('Content-Type', 'application/json');
$response->end(json_encode([
'code' => 0,
'message' => '任务提交成功',
'data' => [
'task_id' => $taskId,
'message_id' => $message->getId()
]
]));
}
/**
* 查询任务状态
*/
public function status(Request $request, Response $response): void
{
$taskId = $request->get['id'] ?? '';
$status = $this->queue->getStatus($taskId);
$response->header('Content-Type', 'application/json');
$response->end(json_encode([
'code' => 0,
'data' => $status
]));
}
/**
* 获取任务统计
*/
public function stats(Request $request, Response $response): void
{
$period = $request->get['period'] ?? 'today';
$stats = [
'total' => $this->queue->getTotalCount($period),
'success' => $this->queue->getSuccessCount($period),
'failed' => $this->queue->getFailedCount($period),
'processing' => $this->queue->getProcessingCount(),
'avg_time' => $this->queue->getAverageProcessTime($period)
];
$response->header('Content-Type', 'application/json');
$response->end(json_encode([
'code' => 0,
'data' => $stats
]));
}
/**
* 重试失败任务
*/
public function retry(Request $request, Response $response): void
{
$taskId = $request->get['id'] ?? '';
$result = $this->queue->retryFailedTask($taskId);
$response->header('Content-Type', 'application/json');
$response->end(json_encode([
'code' => $result ? 0 : 1,
'message' => $result ? '重试成功' : '重试失败'
]));
}
}
第五部分:部署与优化
5.1 Docker部署配置
# docker-compose.yml
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
mysql:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: ${DB_PASSWORD}
MYSQL_DATABASE: task_queue
ports:
- "3306:3306"
volumes:
- mysql_data:/var/lib/mysql
php-worker:
build:
context: .
dockerfile: Dockerfile.worker
depends_on:
- redis
- mysql
environment:
REDIS_HOST: redis
MYSQL_HOST: mysql
deploy:
replicas: 3
restart_policy:
condition: on-failure
websocket:
build:
context: .
dockerfile: Dockerfile.websocket
ports:
- "9502:9502"
depends_on:
- redis
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- php-worker
volumes:
redis_data:
mysql_data:
5.2 性能优化建议
- 连接池管理:使用Swoole的连接池复用Redis/MySQL连接
- 内存优化:合理设置Swoole的worker_num和task_worker_num
- 监控告警:集成Prometheus + Grafana监控系统指标
- 日志分级:使用Monolog实现结构化日志
- 熔断降级:实现任务处理的熔断机制
5.3 启动脚本
#!/bin/bash
# start.sh
# 启动Worker进程
php src/Worker/start.php --workers=4 --daemon
# 启动WebSocket监控
php src/WebSocket/monitor.php --daemon
# 启动HTTP API服务
php -S 0.0.0.0:8080 -t public/ &
echo "任务队列系统启动完成"
echo "- Worker进程: 4个"
echo "- 监控面板: http://localhost:9502"
echo "- API接口: http://localhost:8080"
第六部分:实战应用场景
6.1 电商系统应用
// 订单处理任务
$orderMessage = new TaskMessage('order_process', [
'order_id' => $orderId,
'actions' => [
'deduct_inventory', // 扣减库存
'generate_invoice', // 生成发票
'send_confirmation', // 发送确认邮件
'notify_logistics' // 通知物流
]
], 2); // 较高优先级
$queue->push($orderMessage);
6.2 内容管理系统
// 内容发布任务链
$publishMessage = new TaskMessage('content_publish', [
'content_id' => $contentId,
'steps' => [
'generate_static_html', // 生成静态页面
'update_sitemap', // 更新站点地图
'push_cdn', // 推送到CDN
'notify_subscribers', // 通知订阅者
'post_to_social' // 发布到社交媒体
]
], 1, 60); // 延迟60秒发布
$queue->push($publishMessage);
6.3 大数据处理
// 数据分析任务
$analysisMessage = new TaskMessage('data_analysis', [
'dataset' => 'user_behavior',
'start_date' => '2023-01-01',
'end_date' => '2023-10-31',
'analytics' => [
'user_retention',
'conversion_funnel',
'behavior_clustering'
]
], 3); // 低优先级,后台运行
$queue->push($analysisMessage);
总结与最佳实践
核心优势
- 高性能:基于Swoole协程,支持数万并发任务
- 高可靠:Redis持久化 + 失败重试机制
- 易扩展:分布式架构,支持水平扩展
- 实时监控:WebSocket实时推送任务状态
- 生产就绪:包含完整的错误处理和日志记录
注意事项
- 确保Redis和MySQL的高可用配置
- 合理设置任务超时时间和重试策略
- 实现任务的幂等性处理
- 定期清理已完成的任务数据
- 监控系统资源使用情况
扩展方向
1. 集成Kafka作为消息中间件
2. 实现任务依赖关系管理
3. 添加任务优先级队列
4. 支持分布式事务
5. 集成机器学习任务调度
// 代码高亮辅助
document.addEventListener(‘DOMContentLoaded’, function() {
const codeBlocks = document.querySelectorAll(‘pre code’);
codeBlocks.forEach(block => {
// 添加行号
const lines = block.textContent.split(‘n’);
if (lines.length > 1) {
const lineNumbers = lines.map((_, i) =>
`${i + 1}`
).join(”);
const wrapper = document.createElement(‘div’);
wrapper.className = ‘code-wrapper’;
wrapper.innerHTML = `
`;
block.innerHTML = ”;
block.appendChild(wrapper);
}
// 添加复制按钮
const copyBtn = document.createElement(‘button’);
copyBtn.textContent = ‘复制’;
copyBtn.className = ‘copy-btn’;
copyBtn.onclick = function() {
navigator.clipboard.writeText(block.textContent)
.then(() => {
copyBtn.textContent = ‘已复制!’;
setTimeout(() => {
copyBtn.textContent = ‘复制’;
}, 2000);
});
};
block.parentNode.insertBefore(copyBtn, block);
});
});

