PHP8.2纤程(Fiber)实战:构建可中断的批量文件处理系统
一、纤程与传统协程对比
特性 |
纤程(Fiber) |
协程(Coroutine) |
PHP版本要求 |
PHP ≥ 8.1 |
需要Swoole/RxPHP等扩展 |
执行控制 |
完全用户空间调度 |
依赖扩展调度器 |
内存占用 |
每个纤程约8KB |
通常20-50KB |
适用场景 |
同步代码可中断处理 |
高并发网络IO |
二、文件处理系统核心实现
1. 纤程任务管理器
class FiberTaskManager {
private $fibers = [];
private $fileQueue;
private $isPaused = false;
private $checkpointFile = 'progress.cpt';
public function __construct(array $files) {
$this->fileQueue = new SplQueue();
foreach ($files as $file) {
$this->fileQueue->enqueue($file);
}
$this->loadProgress();
}
public function start() {
while (!$this->fileQueue->isEmpty()) {
if ($this->isPaused) {
$this->saveProgress();
return;
}
$file = $this->fileQueue->dequeue();
$fiber = new Fiber(function() use ($file) {
$this->processFile($file);
});
$this->fibers[] = $fiber;
$fiber->start();
}
}
private function processFile(string $path) {
$handler = fopen($path, 'r');
while (!feof($handler)) {
// 每处理100行检查中断状态
if (Fiber::getCurrent()->isSuspended()) {
fclose($handler);
return;
}
$line = fgets($handler);
$this->transformLine($line);
Fiber::suspend(); // 主动让出控制权
}
fclose($handler);
}
}
2. 断点续传实现
private function saveProgress() {
$progress = [
'remaining_files' => iterator_to_array($this->fileQueue),
'timestamp' => time()
];
file_put_contents(
$this->checkpointFile,
serialize($progress)
);
}
private function loadProgress() {
if (file_exists($this->checkpointFile)) {
$progress = unserialize(
file_get_contents($this->checkpointFile)
);
foreach ($progress['remaining_files'] as $file) {
$this->fileQueue->enqueue($file);
}
unlink($this->checkpointFile);
}
}
public function pause() {
$this->isPaused = true;
}
public function resume() {
$this->isPaused = false;
$this->start();
}
3. 文件内容转换处理
private function transformLine(string &$line) {
// CSV转JSON示例
if (pathinfo($file, PATHINFO_EXTENSION) === 'csv') {
$data = str_getcsv($line);
$line = json_encode(array_combine(
['id', 'name', 'value'],
$data
));
}
// 其他格式转换...
}
public function setTransformer(callable $transformer) {
$this->transformer = $transformer;
}
三、性能优化方案
- 内存控制:使用生成器逐行处理大文件
- 并行处理:配合parallel扩展实现多核利用
- 进度快照:二进制格式保存检查点
- 异常恢复:记录失败文件单独重试
执行流程图
开始 → 读取检查点 → 创建纤程 → 处理文件
↑ ↓
← 暂停/中断 ←
四、实际应用场景
- 日志分析系统:TB级日志文件处理
- 数据迁移工具:数据库导出文件转换
- 媒体处理流水线:图片/视频元数据提取
- 科学计算:大型数据集分块处理