构建微服务架构下的统一入口与流量管控系统
一、API网关核心价值
现代分布式系统中API网关的关键作用:
功能维度 | 传统架构 | 网关架构 |
---|---|---|
入口管理 | 多入口分散 | 统一入口管控 |
安全防护 | 各服务独立实现 | 集中式安全策略 |
流量控制 | 难以全局控制 | 精细化流量管理 |
监控分析 | 分散日志收集 | 统一监控大盘 |
二、核心架构设计
1. 系统分层架构
客户端 → API网关层 → 业务服务层 → 数据服务层 ↑ ↑ ↑ 鉴权/限流 请求转发 业务逻辑处理
2. 核心处理流程
请求接收 → 预处理 → 路由匹配 → 过滤器链 → 服务调用 → 响应处理
↑ ↑ ↑ ↑ ↑
SSL终止 参数校验 负载均衡 认证/限流/日志 数据聚合
三、关键技术实现
1. 高性能路由引擎
<?php
class Router
{
private $routes = [];
private $cacheFile;
public function __construct(string $cacheFile = null)
{
$this->cacheFile = $cacheFile;
$this->loadRoutes();
}
private function loadRoutes()
{
if ($this->cacheFile && file_exists($this->cacheFile)) {
$this->routes = include $this->cacheFile;
return;
}
// 从数据库或配置文件加载路由
$routes = Db::table('gateway_routes')
->where('status', 1)
->select()
->toArray();
foreach ($routes as $route) {
$this->addRoute(
$route['method'],
$route['path'],
$route['service'],
$route['strip_prefix'] ?? false
);
}
if ($this->cacheFile) {
file_put_contents(
$this->cacheFile,
'<?php return ' . var_export($this->routes, true) . ';'
);
}
}
public function addRoute(string $method, string $path, string $service, bool $stripPrefix = false)
{
$method = strtoupper($method);
$pattern = $this->compileRoute($path);
$this->routes[$method][$pattern] = [
'service' => $service,
'params' => [],
'strip_prefix' => $stripPrefix
];
}
private function compileRoute(string $path): string
{
// 转换 {param} 为正则捕获组
$pattern = preg_replace('/{(w+)}/', '(?P[^/]+)', $path);
return '#^' . $pattern . '$#';
}
public function match(string $method, string $uri): ?array
{
$method = strtoupper($method);
if (!isset($this->routes[$method])) {
return null;
}
foreach ($this->routes[$method] as $pattern => $config) {
if (preg_match($pattern, $uri, $matches)) {
$params = array_filter($matches, 'is_string', ARRAY_FILTER_USE_KEY);
return [
'service' => $config['service'],
'params' => $params,
'strip_prefix' => $config['strip_prefix']
];
}
}
return null;
}
}
2. 过滤器链实现
<?php
interface Filter
{
public function handle(Request $request, callable $next): Response;
}
class FilterChain
{
private $filters = [];
private $handler;
public function addFilter(Filter $filter): self
{
$this->filters[] = $filter;
return $this;
}
public function setHandler(callable $handler): self
{
$this->handler = $handler;
return $this;
}
public function run(Request $request): Response
{
$runner = $this->handler;
// 反向包装过滤器
foreach (array_reverse($this->filters) as $filter) {
$runner = function ($request) use ($filter, $runner) {
return $filter->handle($request, $runner);
};
}
return $runner($request);
}
}
// 示例:认证过滤器
class AuthFilter implements Filter
{
public function handle(Request $request, callable $next): Response
{
$token = $request->getHeader('Authorization');
if (!$this->validateToken($token)) {
return new Response(401, ['Content-Type' => 'application/json'], [
'code' => 401,
'message' => 'Unauthorized'
]);
}
return $next($request);
}
private function validateToken(string $token): bool
{
// JWT验证逻辑
return true;
}
}
四、高级功能实现
1. 智能限流系统
<?php
class RateLimiter
{
private $redis;
private $limits = [];
public function __construct(Redis $redis)
{
$this->redis = $redis;
}
public function addLimit(string $key, int $limit, int $window): void
{
$this->limits[$key] = [
'limit' => $limit,
'window' => $window
];
}
public function check(string $key): bool
{
if (!isset($this->limits[$key])) {
return true;
}
$limit = $this->limits[$key]['limit'];
$window = $this->limits[$key]['window'];
$now = time();
// 滑动窗口算法
$script = <<<LUA
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local clearBefore = now - window
redis.call('ZREMRANGEBYSCORE', key, 0, clearBefore)
local currentCount = redis.call('ZCARD', key)
if currentCount >= limit then
return 0
end
redis.call('ZADD', key, now, now)
redis.call('EXPIRE', key, window)
return 1
LUA;
$result = $this->redis->eval(
$script,
[$key, $now, $window, $limit],
1
);
return (bool)$result;
}
}
// 使用示例
$limiter = new RateLimiter($redis);
$limiter->addLimit('api:user:123', 100, 60); // 用户每分钟100次
if (!$limiter->check('api:user:123')) {
throw new RateLimitExceededException();
}
2. 服务熔断机制
<?php
class CircuitBreaker
{
private $service;
private $failureThreshold;
private $recoveryTimeout;
private $failures = 0;
private $lastFailure = 0;
private $state = 'closed';
public function __construct(
string $service,
int $failureThreshold = 3,
int $recoveryTimeout = 30
) {
$this->service = $service;
$this->failureThreshold = $failureThreshold;
$this->recoveryTimeout = $recoveryTimeout;
}
public function execute(callable $operation)
{
if ($this->state === 'open') {
if (time() - $this->lastFailure > $this->recoveryTimeout) {
$this->state = 'half-open';
} else {
throw new ServiceUnavailableException();
}
}
try {
$result = $operation();
$this->recordSuccess();
return $result;
} catch (Exception $e) {
$this->recordFailure();
throw $e;
}
}
private function recordSuccess()
{
if ($this->state === 'half-open') {
$this->state = 'closed';
$this->failures = 0;
}
}
private function recordFailure()
{
$this->failures++;
$this->lastFailure = time();
if ($this->failures >= $this->failureThreshold) {
$this->state = 'open';
}
}
public function getStatus(): array
{
return [
'service' => $this->service,
'state' => $this->state,
'failures' => $this->failures,
'last_failure' => $this->lastFailure
];
}
}
五、性能优化策略
1. 连接池管理
<?php
class ConnectionPool
{
private $pool;
private $config;
private $size;
private $count = 0;
public function __construct(array $config, int $size = 20)
{
$this->config = $config;
$this->size = $size;
$this->pool = new SplQueue();
}
public function getConnection()
{
if (!$this->pool->isEmpty()) {
return $this->pool->dequeue();
}
if ($this->count < $this->size) {
$this->count++;
return $this->createConnection();
}
throw new RuntimeException('Connection pool exhausted');
}
public function releaseConnection($connection)
{
if ($connection->isConnected()) {
$this->pool->enqueue($connection);
} else {
$this->count--;
}
}
private function createConnection()
{
$conn = new SwooleCoroutineHttpClient(
$this->config['host'],
$this->config['port']
);
$conn->set([
'timeout' => $this->config['timeout'] ?? 3.0,
'keep_alive' => true
]);
return $conn;
}
}
2. 异步日志处理
<?php
class AsyncLogger
{
private $queue;
private $worker;
public function __construct()
{
$this->queue = new SwooleCoroutineChannel(1000);
$this->startWorker();
}
public function log(string $message, array $context = [])
{
$logEntry = [
'time' => microtime(true),
'message' => $message,
'context' => $context
];
$this->queue->push($logEntry);
}
private function startWorker()
{
go(function() {
$file = fopen('gateway.log', 'a');
while (true) {
$logEntry = $this->queue->pop();
$line = sprintf(
"[%s] %s %sn",
date('Y-m-d H:i:s', $logEntry['time']),
$logEntry['message'],
json_encode($logEntry['context'])
);
fwrite($file, $line);
// 每100条刷新一次
if ($this->queue->length() % 100 === 0) {
fflush($file);
}
}
});
}
}
六、实战案例:电商API网关
1. 商品服务路由配置
<?php
// 路由注册
$router->addRoute('GET', '/api/v1/products', 'product-service', true);
$router->addRoute('GET', '/api/v1/products/{id}', 'product-service', true);
$router->addRoute('POST', '/api/v1/products', 'product-service', true);
// 限流配置
$limiter->addLimit('api:products:list', 1000, 60); // 每分钟1000次
$limiter->addLimit('api:products:create', 100, 60); // 每分钟100次
// 过滤器配置
$chain = new FilterChain();
$chain->addFilter(new AuthFilter())
->addFilter(new RateLimitFilter($limiter))
->addFilter(new LoggingFilter($logger));
// 请求处理
$app->get('/api/v1/products', function(Request $request) use ($chain) {
return $chain->setHandler(function($request) {
$route = $router->match($request->getMethod(), $request->getPathInfo());
$client = $pool->getConnection($route['service']);
// 转发请求
$response = $client->request($request);
return new Response(
$response->getStatusCode(),
$response->getHeaders(),
$response->getBody()
);
})->run($request);
});
2. 聚合API实现
<?php
public function getProductDetail(Request $request)
{
$productId = $request->getAttribute('id');
// 并行调用多个服务
$results = Coroutinebatch([
'product' => function() use ($productId) {
return $this->callService('product-service', "/products/{$productId}");
},
'inventory' => function() use ($productId) {
return $this->callService('inventory-service', "/stocks/{$productId}");
},
'reviews' => function() use ($productId) {
return $this->callService('review-service', "/reviews?product_id={$productId}");
}
]);
// 数据聚合
return [
'id' => $productId,
'title' => $results['product']['title'],
'price' => $results['product']['price'],
'stock' => $results['inventory']['quantity'],
'rating' => $this->calculateRating($results['reviews']),
'reviews' => array_slice($results['reviews'], 0, 5)
];
}
private function callService(string $service, string $path): array
{
$client = $this->pool->getConnection($service);
$response = $client->get($path);
$this->pool->releaseConnection($client);
if ($response->getStatusCode() !== 200) {
throw new ServiceCallException("Service {$service} error");
}
return json_decode($response->getBody(), true);
}