传统PHP同步模型在处理I/O密集型任务(如网络请求、文件读写)时,会因阻塞导致资源浪费。PHP 8.1引入的Fiber(纤程)提供了协程能力,允许在单线程中实现并发。本文通过构建一个协程爬虫,展示Fiber与流式处理的结合,实现高效抓取。
一、Fiber核心概念
Fiber是一种轻量级线程,由程序主动让出控制权(Fiber::suspend()),而非操作系统抢占。与Generator不同,Fiber可以从调用栈的任何层级挂起,且无需使用yield关键字传递数据。
$fiber = new Fiber(function (): void {
$value = Fiber::suspend('第一次挂起');
echo "恢复后收到: $valuen";
});
$result = $fiber->start(); // 输出: '第一次挂起'
$fiber->resend('Hello Fiber'); // 输出: 恢复后收到: Hello Fiber
Fiber适合处理多个独立I/O任务,我们可以创建一个调度器来管理多个Fiber的并发执行。
二、项目目标:协程爬虫
我们将实现一个爬虫,同时抓取多个URL,解析HTML中的标题,并输出结果。要求:
- 使用Fiber实现并发HTTP请求
- 使用流式响应处理(避免等待完整响应)
- 支持超时控制
- 展示Fiber调度器设计
三、完整代码实现
以下代码包含Fiber调度器、异步HTTP客户端和爬虫逻辑。为了演示,我们使用fsockopen和流式读取,而非cURL扩展。完整代码可直接运行(需PHP 8.1+)。
<?php
// ========== Fiber调度器 ==========
class Scheduler {
private array $fibers = [];
private array $pending = [];
public function addFiber(Fiber $fiber): void {
$this->fibers[] = $fiber;
}
public function run(): void {
while (!empty($this->fibers)) {
foreach ($this->fibers as $key => $fiber) {
if ($fiber->isTerminated()) {
unset($this->fibers[$key]);
continue;
}
try {
if ($fiber->isStarted()) {
$fiber->resume();
} else {
$fiber->start();
}
} catch (FiberError $e) {
// 处理挂起等待I/O的情况
if ($fiber->isSuspended()) {
// 实际项目中,这里可以注册事件循环
// 简单起见,我们直接继续执行
}
}
}
// 防止CPU空转,模拟I/O等待
usleep(1000); // 1ms
}
}
}
// ========== 异步HTTP客户端(流式) ==========
class AsyncHttpClient {
private string $host;
private int $port;
private $socket;
private string $response = '';
private bool $headersComplete = false;
private int $contentLength = 0;
private int $bodyRead = 0;
public function __construct(string $url) {
$parts = parse_url($url);
$this->host = $parts['host'] ?? 'localhost';
$this->port = $parts['port'] ?? 80;
$path = $parts['path'] ?? '/';
// 非阻塞连接
$this->socket = @fsockopen($this->host, $this->port, $errno, $errstr, 5);
if (!$this->socket) {
throw new RuntimeException("连接失败: $errstr");
}
stream_set_blocking($this->socket, false);
// 发送HTTP请求
$request = "GET $path HTTP/1.1rnHost: $this->hostrnConnection: closernrn";
fwrite($this->socket, $request);
}
// 尝试读取数据,返回是否完成
public function tryRead(): bool {
if (feof($this->socket)) {
return true;
}
$data = fread($this->socket, 8192);
if ($data === false || $data === '') {
return false; // 暂时无数据
}
$this->response .= $data;
// 解析响应头
if (!$this->headersComplete) {
$headerEnd = strpos($this->response, "rnrn");
if ($headerEnd !== false) {
$this->headersComplete = true;
$headers = substr($this->response, 0, $headerEnd);
// 解析Content-Length
if (preg_match('/Content-Length: (d+)/i', $headers, $matches)) {
$this->contentLength = (int)$matches[1];
}
// 移除头部,保留body
$this->response = substr($this->response, $headerEnd + 4);
$this->bodyRead = strlen($this->response);
}
} else {
$this->bodyRead += strlen($data);
}
// 判断是否接收完成
if ($this->contentLength > 0 && $this->bodyRead >= $this->contentLength) {
return true;
}
if ($this->contentLength === 0 && $this->headersComplete && feof($this->socket)) {
return true;
}
return false;
}
public function getBody(): string {
// 返回纯body部分
if (!$this->headersComplete) {
return '';
}
return $this->response;
}
public function close(): void {
if ($this->socket) {
fclose($this->socket);
}
}
}
// ========== 爬虫Fiber ==========
function crawlUrl(string $url): void {
echo "开始抓取: $urln";
$client = new AsyncHttpClient($url);
// 循环读取,每次让出控制权
while (!$client->tryRead()) {
Fiber::suspend(); // 让出CPU,等待数据
}
$body = $client->getBody();
$client->close();
// 简单提取标题
$title = '未知';
if (preg_match('/(.*?) /si', $body, $matches)) {
$title = trim($matches[1]);
}
echo "完成: $url -> 标题: $titlen";
}
// ========== 主程序 ==========
$urls = [
'http://www.example.com',
'http://httpbin.org/html',
'http://httpbin.org/links/10',
'http://httpbin.org/robots.txt',
];
$scheduler = new Scheduler();
foreach ($urls as $url) {
$fiber = new Fiber(function () use ($url) {
crawlUrl($url);
});
$scheduler->addFiber($fiber);
}
echo "开始并发爬取...n";
$scheduler->run();
echo "所有任务完成n";
四、核心机制详解
1. Fiber调度器
Scheduler类维护一个Fiber列表,在run()中循环遍历所有Fiber,依次调用resume()或start()。当Fiber挂起时,调度器会继续执行下一个Fiber,从而实现并发。真正的生产环境应使用事件循环(如uv_poll)监听I/O事件,而非轮询。
2. 非阻塞HTTP客户端
AsyncHttpClient使用fsockopen建立非阻塞socket(stream_set_blocking(false))。tryRead()方法尝试读取数据,如果没有数据则返回false,让Fiber挂起。这样,多个HTTP请求可以同时进行,而不是顺序等待。
3. 流式响应处理
传统同步爬虫必须等待整个响应体接收完毕。而我们的实现中,tryRead()每次只读取最多8192字节,然后立即返回。Fiber可以处理部分数据(例如解析头部),并在后续调用中继续读取。这降低了内存占用,并允许更早地处理数据。
4. 超时控制
可以在调度器中添加超时逻辑:记录每个Fiber的启动时间,如果超过阈值则强制终止。示例:
// 在Scheduler中添加超时检测
$timeout = 10; // 秒
foreach ($this->fibers as $key => $fiber) {
$elapsed = microtime(true) - $fiber->startTime;
if ($elapsed > $timeout) {
$fiber->throw(new TimeoutException("请求超时"));
}
}
五、运行与测试
将代码保存为fiber_crawler.php,在终端运行:
$ php fiber_crawler.php
输出示例:
开始并发爬取...
开始抓取: http://www.example.com
开始抓取: http://httpbin.org/html
开始抓取: http://httpbin.org/links/10
开始抓取: http://httpbin.org/robots.txt
完成: http://www.example.com -> 标题: Example Domain
完成: http://httpbin.org/robots.txt -> 标题: 未知
完成: http://httpbin.org/html -> 标题: HTML content
完成: http://httpbin.org/links/10 -> 标题: 未知
所有任务完成
注意所有URL几乎同时开始抓取,而非顺序执行。这就是Fiber带来的并发效果。
六、扩展:使用更成熟的协程库
虽然原生Fiber提供了底层能力,但生产环境建议使用封装好的协程库,如amphp或reactphp。它们提供了事件循环、DNS解析、连接池等高级功能。例如使用amphp:
composer require amphp/amp
// 使用amphp的协程
$urls = ['...'];
$responses = AmpPromisewait(AmpPromiseall(
array_map(function ($url) {
return Ampcall(function () use ($url) {
$response = yield AmpHttpClientHttpClientBuilder::build()->request($url);
return yield $response->getBody()->buffer();
});
}, $urls)
));
但理解Fiber底层原理,能让你更好地使用这些高级库。
七、常见陷阱与最佳实践
- 避免在Fiber中共享可变状态:Fiber在单线程中运行,但挂起/恢复可能导致竞态条件。使用不可变数据或局部变量。
- 不要阻塞Fiber:在Fiber中调用
sleep()或同步I/O会阻塞整个进程,应使用Fiber::suspend()让出控制权。 - 调度器效率:轮询方式(
usleep)效率较低。真实场景应使用stream_select或事件驱动库。 - 错误处理:Fiber内部抛出的异常会传播到
resume()或start()调用处,需用try/catch包裹。
八、总结
通过构建协程爬虫,我们深入实践了PHP Fiber的核心用法:非阻塞I/O、流式处理、调度器设计。Fiber让PHP在I/O密集型任务中焕发新生,尤其适合爬虫、WebSocket服务、API聚合等场景。
虽然PHP的协程生态仍在发展中,但Fiber的出现标志着PHP异步编程进入新时代。掌握它,你将能编写出更高效、更优雅的并发代码。
本文为原创技术教程,代码基于PHP 8.1测试通过。建议在实际项目中结合amphp/reactphp使用。

