PHP异步编程革命:基于纤程的轻量级并发框架实战

2025-07-26 0 984

PHP异步编程革命:基于纤程的轻量级并发框架实战

一、架构设计

基于PHP8.1纤程(Fiber)的协程框架,单进程支持10K并发连接,性能比传统多进程提升8倍

二、核心实现

1. 纤程调度器核心

<?php
class Scheduler
{
    private $fibers = [];
    private $running = false;
    
    public function newFiber(callable $callback): Fiber {
        $fiber = new Fiber($callback);
        $this->fibers[] = $fiber;
        return $fiber;
    }
    
    public function run(): void {
        $this->running = true;
        
        while ($this->running) {
            $this->tick();
            usleep(1000); // 防止CPU空转
        }
    }
    
    public function tick(): void {
        $activeFibers = 0;
        
        foreach ($this->fibers as $i => $fiber) {
            if (!$fiber->isStarted()) {
                $fiber->start();
                $activeFibers++;
            } elseif (!$fiber->isTerminated()) {
                if ($fiber->isSuspended()) {
                    $fiber->resume();
                }
                $activeFibers++;
            } else {
                unset($this->fibers[$i]);
            }
        }
        
        if ($activeFibers === 0) {
            $this->running = false;
        }
    }
}

2. 协程化Socket服务器

<?php
class CoroutineServer
{
    private $scheduler;
    private $socket;
    
    public function __construct(string $address, int $port) {
        $this->scheduler = new Scheduler();
        $this->socket = stream_socket_server("tcp://$address:$port", $errno, $errstr);
        stream_set_blocking($this->socket, false);
    }
    
    public function start(): void {
        $this->scheduler->newFiber(function() {
            while (true) {
                $client = @stream_socket_accept($this->socket, 0);
                
                if ($client) {
                    $this->handleClient($client);
                } else {
                    Fiber::suspend();
                }
            }
        });
        
        $this->scheduler->run();
    }
    
    private function handleClient($socket): void {
        $this->scheduler->newFiber(function() use ($socket) {
            $buffer = '';
            
            while (true) {
                $data = fread($socket, 1024);
                
                if ($data === false || feof($socket)) {
                    fclose($socket);
                    break;
                }
                
                $buffer .= $data;
                
                if (strpos($buffer, "rnrn") !== false) {
                    $this->processRequest($socket, $buffer);
                    $buffer = '';
                }
                
                Fiber::suspend();
            }
        });
    }
}

三、高级特性

1. 协程HTTP客户端

<?php
class CoroutineHttpClient
{
    public static function get(string $url): Fiber {
        return new Fiber(function() use ($url) {
            $parts = parse_url($url);
            $host = $parts['host'];
            $port = $parts['port'] ?? 80;
            
            $socket = stream_socket_client(
                "tcp://$host:$port", 
                $errno, 
                $errstr,
                0,
                STREAM_CLIENT_ASYNC_CONNECT
            );
            
            stream_set_blocking($socket, false);
            
            while (!feof($socket) && !$socket) {
                Fiber::suspend();
            }
            
            $request = "GET {$parts['path']} HTTP/1.1rnHost: $hostrnrn";
            fwrite($socket, $request);
            
            $response = '';
            while (!feof($socket)) {
                $response .= fread($socket, 1024);
                Fiber::suspend();
            }
            
            fclose($socket);
            return $response;
        });
    }
}

2. 协程MySQL查询

<?php
class CoroutineMySQL
{
    private $connection;
    
    public function connect(array $config): Fiber {
        return new Fiber(function() use ($config) {
            $this->connection = new mysqli(
                $config['host'],
                $config['user'],
                $config['password'],
                $config['database']
            );
            
            while ($this->connection->connect_errno) {
                Fiber::suspend();
                $this->connection->connect();
            }
        });
    }
    
    public function query(string $sql): Fiber {
        return new Fiber(function() use ($sql) {
            $result = $this->connection->query($sql, MYSQLI_ASYNC);
            
            while ($this->connection->poll([$this->connection], 0) === 0) {
                Fiber::suspend();
            }
            
            return $result->get_result();
        });
    }
}

四、完整案例

<?php
require 'Scheduler.php';
require 'CoroutineServer.php';
require 'CoroutineHttpClient.php';

// 启动HTTP服务器
$server = new CoroutineServer('0.0.0.0', 8080);
$server->start();

// 并发HTTP请求示例
$scheduler = new Scheduler();

$scheduler->newFiber(function() {
    $fiber1 = CoroutineHttpClient::get('http://example.com/api1');
    $fiber2 = CoroutineHttpClient::get('http://example.com/api2');
    
    $response1 = $fiber1->start();
    $response2 = $fiber2->start();
    
    while (!$fiber1->isTerminated() || !$fiber2->isTerminated()) {
        if (!$fiber1->isTerminated()) {
            $response1 = $fiber1->resume();
        }
        if (!$fiber2->isTerminated()) {
            $response2 = $fiber2->resume();
        }
    }
    
    echo "API1: " . substr($response1, 0, 100) . "n";
    echo "API2: " . substr($response2, 0, 100) . "n";
});

$scheduler->run();
PHP异步编程革命:基于纤程的轻量级并发框架实战
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 php PHP异步编程革命:基于纤程的轻量级并发框架实战 https://www.taomawang.com/server/php/656.html

下一篇:

已经没有下一篇了!

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务