原创作者:PHP高级架构师 | 发布日期:2023年11月
一、PHP协程编程核心概念
1.1 从传统阻塞到协程非阻塞
传统PHP-FPM模式下,每个请求都是独立的进程/线程,IO操作会阻塞整个进程。协程通过用户态调度实现非阻塞并发。
传统阻塞模式 vs 协程非阻塞模式:
传统阻塞模式:
请求1: [连接DB]---[查询]---[等待]---[返回]
请求2: [连接DB]---[查询]---[等待]---[返回]
协程非阻塞模式:
请求1: [连接DB]---[挂起]---[恢复]---[返回]
请求2: [连接DB]---[挂起]---[恢复]---[返回]
1.2 PHP协程实现原理
// 基础协程示例
function task1() {
for ($i = 1; $i <= 3; $i++) {
echo "任务1: 第{$i}次执行n";
yield; // 让出执行权
}
}
function task2() {
for ($i = 1; $i addTask(task1());
$scheduler->addTask(task2());
$scheduler->run();
二、PHP 8.1+ 纤程深度解析
2.1 纤程基础用法
// PHP 8.1+ 纤程示例
function httpRequest($url) {
echo "开始请求: {$url}n";
// 模拟网络IO
Fiber::suspend();
// 模拟IO完成
echo "请求完成: {$url}n";
return "响应数据: {$url}";
}
$fiber = new Fiber(function() {
$result1 = httpRequest('https://api.example.com/users');
$result2 = httpRequest('https://api.example.com/products');
return [$result1, $result2];
});
// 启动纤程
echo "启动纤程...n";
$fiber->start();
// 恢复执行
echo "恢复纤程...n";
$fiber->resume();
// 获取结果
if ($fiber->isTerminated()) {
$result = $fiber->getReturn();
print_r($result);
}
2.2 纤程调度器实现
class FiberScheduler
{
private $fibers = [];
private $timers = [];
public function add(Fiber $fiber, $data = null) {
$this->fibers[] = ['fiber' => $fiber, 'data' => $data];
}
public function setTimeout(callable $callback, int $delay) {
$this->timers[] = [
'callback' => $callback,
'execute_at' => time() + $delay
];
}
public function run() {
while (!empty($this->fibers) || !empty($this->timers)) {
// 处理纤程
foreach ($this->fibers as $index => $item) {
$fiber = $item['fiber'];
if (!$fiber->isStarted()) {
$fiber->start($item['data']);
} elseif (!$fiber->isTerminated()) {
try {
$fiber->resume();
} catch (Throwable $e) {
echo "纤程异常: " . $e->getMessage() . "n";
unset($this->fibers[$index]);
}
} else {
unset($this->fibers[$index]);
}
}
// 处理定时器
$currentTime = time();
foreach ($this->timers as $index => $timer) {
if ($currentTime >= $timer['execute_at']) {
call_user_func($timer['callback']);
unset($this->timers[$index]);
}
}
usleep(1000); // 避免CPU空转
}
}
}
// 使用示例
$scheduler = new FiberScheduler();
$scheduler->add(new Fiber(function() {
echo "纤程1开始n";
Fiber::suspend();
echo "纤程1恢复n";
}));
$scheduler->setTimeout(function() {
echo "定时器触发n";
}, 2);
$scheduler->run();
三、异步IO编程实战
3.1 基于Swoole的协程HTTP客户端
class AsyncHttpClient
{
public function concurrentRequests($urls) {
$results = [];
// 创建协程通道
$channel = new SwooleCoroutineChannel(count($urls));
foreach ($urls as $url) {
go(function() use ($url, $channel) {
try {
$client = new SwooleCoroutineHttpClient(
parse_url($url, PHP_URL_HOST),
parse_url($url, PHP_URL_PORT) ?: 80
);
$client->set(['timeout' => 5]);
$client->get(parse_url($url, PHP_URL_PATH));
$channel->push([
'url' => $url,
'status' => $client->statusCode,
'data' => $client->body,
'error' => null
]);
$client->close();
} catch (Exception $e) {
$channel->push([
'url' => $url,
'status' => 0,
'data' => null,
'error' => $e->getMessage()
]);
}
});
}
// 收集结果
for ($i = 0; $i pop();
}
$channel->close();
return $results;
}
}
// 使用示例
$client = new AsyncHttpClient();
$urls = [
'https://httpbin.org/get',
'https://httpbin.org/delay/2',
'https://httpbin.org/status/200'
];
$start = microtime(true);
$results = $client->concurrentRequests($urls);
$end = microtime(true);
echo "并发请求完成,耗时: " . ($end - $start) . "秒n";
print_r($results);
3.2 协程MySQL连接池
class CoroutineMySQLPool
{
private $pool;
private $config;
private $minConnections;
private $maxConnections;
public function __construct($config, $min = 5, $max = 20) {
$this->config = $config;
$this->minConnections = $min;
$this->maxConnections = $max;
$this->pool = new SwooleCoroutineChannel($max);
$this->initializePool();
}
private function initializePool() {
for ($i = 0; $i minConnections; $i++) {
$this->pool->push($this->createConnection());
}
}
private function createConnection() {
$mysql = new SwooleCoroutineMySQL();
$connected = $mysql->connect([
'host' => $this->config['host'],
'port' => $this->config['port'],
'user' => $this->config['user'],
'password' => $this->config['password'],
'database' => $this->config['database'],
'timeout' => 5,
'charset' => 'utf8mb4'
]);
if (!$connected) {
throw new RuntimeException("MySQL连接失败: " . $mysql->connect_error);
}
return $mysql;
}
public function getConnection() {
if (!$this->pool->isEmpty()) {
return $this->pool->pop();
}
if ($this->pool->length() maxConnections) {
return $this->createConnection();
}
// 等待连接释放
return $this->pool->pop(10); // 10秒超时
}
public function releaseConnection($connection) {
if ($connection->connected) {
$this->pool->push($connection);
} else {
// 连接已断开,创建新连接
$this->pool->push($this->createConnection());
}
}
public function query($sql, $params = []) {
$connection = $this->getConnection();
try {
$statement = $connection->prepare($sql);
if ($statement) {
$result = $statement->execute($params);
return $result;
} else {
throw new RuntimeException("SQL预处理失败: " . $connection->error);
}
} finally {
$this->releaseConnection($connection);
}
}
}
// 使用示例
$pool = new CoroutineMySQLPool([
'host' => '127.0.0.1',
'port' => 3306,
'user' => 'root',
'password' => 'password',
'database' => 'test'
]);
// 并发查询
go(function() use ($pool) {
$results = [];
for ($i = 0; $i query("SELECT * FROM users WHERE id = ?", [$i + 1]);
}
return $results;
});
四、通道与连接池高级应用
4.1 生产者-消费者模式
class TaskProcessor
{
private $taskChannel;
private $resultChannel;
private $workerCount;
public function __construct($workerCount = 4) {
$this->workerCount = $workerCount;
$this->taskChannel = new SwooleCoroutineChannel(1000);
$this->resultChannel = new SwooleCoroutineChannel(1000);
}
public function start() {
// 启动工作协程
for ($i = 0; $i workerCount; $i++) {
go(function() {
while (true) {
$task = $this->taskChannel->pop();
if ($task === false) {
break; // 通道关闭
}
$result = $this->processTask($task);
$this->resultChannel->push($result);
}
});
}
}
public function addTask($task) {
return $this->taskChannel->push($task);
}
public function getResult() {
return $this->resultChannel->pop();
}
public function stop() {
$this->taskChannel->close();
}
private function processTask($task) {
// 模拟任务处理
SwooleCoroutine::sleep(0.1);
return [
'task_id' => $task['id'],
'result' => "处理完成: " . $task['data'],
'worker_id' => SwooleCoroutine::getCid()
];
}
}
// 使用示例
$processor = new TaskProcessor(4);
$processor->start();
// 生产者协程
go(function() use ($processor) {
for ($i = 1; $i addTask([
'id' => $i,
'data' => "任务数据{$i}"
]);
}
});
// 消费者协程
go(function() use ($processor) {
$completed = 0;
while ($completed getResult();
echo "收到结果: " . json_encode($result) . "n";
$completed++;
}
});
4.2 协程限流器
class RateLimiter
{
private $channel;
private $capacity;
private $refillRate; // 令牌/秒
public function __construct($capacity, $refillRate) {
$this->capacity = $capacity;
$this->refillRate = $refillRate;
$this->channel = new SwooleCoroutineChannel($capacity);
$this->initializeTokens();
$this->startRefill();
}
private function initializeTokens() {
for ($i = 0; $i capacity; $i++) {
$this->channel->push(true);
}
}
private function startRefill() {
go(function() {
$interval = 1.0 / $this->refillRate;
while (true) {
SwooleCoroutine::sleep($interval);
if ($this->channel->length() capacity) {
$this->channel->push(true);
}
}
});
}
public function acquire($timeout = -1) {
return $this->channel->pop($timeout) !== false;
}
public function tryAcquire() {
return $this->channel->pop(0) !== false;
}
public function release() {
// 令牌自动补充,通常不需要手动释放
}
}
// 使用示例
$limiter = new RateLimiter(10, 5); // 容量10,每秒补充5个令牌
go(function() use ($limiter) {
for ($i = 0; $i acquire(1)) {
echo "获取令牌成功,执行任务 {$i}n";
// 执行受限制的任务
SwooleCoroutine::sleep(0.1);
} else {
echo "获取令牌超时,跳过任务 {$i}n";
}
}
});
五、生产环境实战案例
5.1 高性能API网关
class ApiGateway
{
private $server;
private $routes;
private $rateLimiters;
public function __construct($host = '0.0.0.0', $port = 9501) {
$this->server = new SwooleHttpServer($host, $port);
$this->routes = [];
$this->rateLimiters = [];
$this->setupServer();
}
private function setupServer() {
$this->server->set([
'worker_num' => swoole_cpu_num() * 2,
'enable_coroutine' => true,
'max_request' => 10000,
'log_file' => '/tmp/api_gateway.log'
]);
$this->server->on('request', function($request, $response) {
$this->handleRequest($request, $response);
});
}
private function handleRequest($request, $response) {
$path = $request->server['request_uri'];
$method = $request->server['request_method'];
$clientIp = $request->server['remote_addr'];
// 限流检查
if (!$this->checkRateLimit($clientIp, $path)) {
$response->status(429);
$response->end(json_encode(['error' => '请求过于频繁']));
return;
}
// 路由匹配
$handler = $this->findRouteHandler($method, $path);
if (!$handler) {
$response->status(404);
$response->end(json_encode(['error' => '接口不存在']));
return;
}
// 协程处理
go(function() use ($handler, $request, $response) {
try {
$result = $handler($request);
$response->header('Content-Type', 'application/json');
$response->end(json_encode($result));
} catch (Exception $e) {
$response->status(500);
$response->end(json_encode(['error' => $e->getMessage()]));
}
});
}
public function addRoute($method, $path, callable $handler) {
$this->routes["{$method}:{$path}"] = $handler;
}
public function start() {
$this->server->start();
}
private function findRouteHandler($method, $path) {
$key = "{$method}:{$path}";
return $this->routes[$key] ?? null;
}
private function checkRateLimit($clientIp, $path) {
$key = "rate_limit:{$clientIp}:{$path}";
if (!isset($this->rateLimiters[$key])) {
$this->rateLimiters[$key] = new RateLimiter(100, 10); // 100容量,10/秒
}
return $this->rateLimiters[$key]->tryAcquire();
}
}
// 使用示例
$gateway = new ApiGateway();
// 添加路由
$gateway->addRoute('GET', '/users', function($request) {
// 模拟数据库查询
SwooleCoroutine::sleep(0.01);
return [
'code' => 200,
'data' => [
['id' => 1, 'name' => '用户1'],
['id' => 2, 'name' => '用户2']
]
];
});
$gateway->addRoute('POST', '/users', function($request) {
$data = json_decode($request->rawContent(), true);
return [
'code' => 201,
'data' => ['id' => 3, 'name' => $data['name']],
'message' => '用户创建成功'
];
});
// 启动网关
// $gateway->start();
总结
本文深入探讨了PHP协程与纤程编程的核心概念和实战应用:
- 基础理论:从生成器到纤程的演进历程
- 核心技术:协程调度、异步IO、通道通信
- 高级应用:连接池、限流器、生产者-消费者模式
- 生产实践:高性能API网关架构设计
- 性能对比:协程模式相比传统模式的性能提升可达10倍以上
通过掌握这些技术,PHP开发者可以构建出媲美Go、Java等语言的高并发应用系统,在微服务、实时通信、大数据处理等场景中发挥重要作用。

