最近手里的一个项目需要接AI对话能力,用户提问题、后端调大模型、再把结果一个字一个字蹦出来——就是那种打字机效果。市面上现成的方案要么太贵,要么绑定了特定平台。索性基于ThinkPHP 8.0自己撸了一个流式对话网关,带限流、带缓存、支持SSE推流,跑了两周很稳定,踩过的坑和完整代码都整理出来了。
这篇文章不会跟你讲”什么是ThinkPHP”或者”MVC是啥”,默认你已经有TP8的基础。咱们直奔主题,把中间件、流式响应、Redis限流、前端对接这几块串起来,形成一个能直接拿去用的方案。
一、整体思路
先花两分钟理清我们要做什么:
- 用户通过前端页面输入问题,前端用EventSource(SSE)连到后端接口
- 后端收到请求后,先过一层限流中间件——同一个IP在60秒内最多调用20次
- 通过限流后,查一下Redis缓存,看有没有现成的答案(相同或高度相似的问题)
- 缓存没命中就走DeepSeek的流式API,把模型返回的token逐个推给前端
- 整个对话记录异步写入数据库,方便后续分析和统计
技术栈:ThinkPHP 8.0 + Redis + MySQL + DeepSeek API(你可以换成OpenAI或其他兼容接口的模型)。
说一下为什么用SSE而不是WebSocket。这个场景是单向推送——服务端往客户端推数据,客户端不需要主动发消息(除了最初的请求)。SSE比WebSocket轻得多,浏览器原生支持,不需要额外的心跳机制,部署也简单。如果你的场景是双向实时通信,那还是得上WebSocket。
二、环境与前置准备
2.1 基础环境
- PHP 8.1+(TP8要求的最低版本)
- Redis 6.0+(用于限流计数和问答缓存)
- MySQL 5.7+(存对话记录)
- Composer 2.x
2.2 安装ThinkPHP 8.0
如果你还没有TP8项目,直接composer创建:
composer create-project topthink/think tp8-ai-gateway
cd tp8-ai-gateway
项目跑起来后,先把基础配置搞定。打开.env文件,确认Redis和数据库的连接信息:
# .env
APP_DEBUG = true
[REDIS]
REDIS_HOST = 127.0.0.1
REDIS_PORT = 6379
REDIS_PASSWORD =
REDIS_SELECT = 0
[DATABASE]
DB_TYPE = mysql
DB_HOST = 127.0.0.1
DB_NAME = ai_gateway
DB_USER = root
DB_PASS = your_password
DB_PORT = 3306
DB_CHARSET = utf8mb4
配置读进系统后,config/redis.php和config/database.php会自动从env取值,不需要手动改。
2.3 安装额外的依赖
我们需要用Guzzle来调DeepSeek的API(当然你也可以用curl,但Guzzle在处理流式响应时更省心):
composer require guzzlehttp/guzzle
到这一步环境就准备好了。
三、数据库设计
建一张对话记录表,字段不用太复杂,够用就行:
CREATE TABLE `chat_records` (
`id` BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
`session_id` VARCHAR(64) NOT NULL COMMENT '会话标识',
`user_ip` VARCHAR(45) NOT NULL COMMENT '用户IP',
`question` TEXT NOT NULL COMMENT '用户问题',
`answer` MEDIUMTEXT NOT NULL COMMENT 'AI回复',
`model` VARCHAR(50) DEFAULT 'deepseek-chat' COMMENT '调用的模型',
`token_count` INT UNSIGNED DEFAULT 0 COMMENT '消耗的token数',
`response_ms` INT UNSIGNED DEFAULT 0 COMMENT '响应耗时(毫秒)',
`created_at` DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_session_id (`session_id`),
INDEX idx_user_ip (`user_ip`),
INDEX idx_created_at (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='AI对话记录表';
session_id用来串联同一轮对话的上下文,这个后面控制器里会生成。
四、核心实现:限流中间件
限流方案选了滑动窗口算法,基于Redis的ZSET实现。为什么不用固定窗口?固定窗口在窗口交界处会有一个明显的漏洞——比如每分钟限制100次,用户在59秒到61秒这两秒内可以发出200次请求(前一个窗口100次+后一个窗口100次)。滑动窗口就没这个毛病。
在TP8里创建一个中间件非常方便,命令行一行搞定:
php think make:middleware RateLimit
这会在app/middleware/下生成RateLimit.php。我们把逻辑填进去:
<?php
declare(strict_types=1);
namespace appmiddleware;
use thinkfacadeCache;
use thinkRequest;
use thinkResponse;
class RateLimit
{
/**
* 60秒内的最大请求次数
*/
protected int $maxRequests = 20;
/**
* 时间窗口(秒)
*/
protected int $windowSeconds = 60;
/**
* Redis key前缀
*/
protected string $keyPrefix = 'rate_limit:';
public function handle(Request $request, callable $next): Response
{
$ip = $request->ip();
$key = $this->keyPrefix . $ip;
$now = microtime(true);
// 拿到Redis实例(TP8的Cache门面如果配置了redis则直接操作redis)
$redis = Cache::store('redis')->handler();
// 移除窗口之外的历史记录
$windowStart = $now - $this->windowSeconds;
$redis->zRemRangeByScore($key, '-inf', $windowStart);
// 统计当前窗口内的请求次数
$currentCount = $redis->zCard($key);
if ($currentCount >= $this->maxRequests) {
// 计算需要等待多少秒才能重试
$oldestTimestamp = $redis->zRange($key, 0, 0, true);
$oldestScore = !empty($oldestTimestamp) ? reset($oldestTimestamp) : $now;
$retryAfter = (int)ceil($oldestScore + $this->windowSeconds - $now);
return Response::create([
'code' => 429,
'message' => "请求过于频繁,请在{$retryAfter}秒后重试",
'data' => null,
], 'json', 429)->header([
'X-RateLimit-Limit' => $this->maxRequests,
'X-RateLimit-Remaining' => 0,
'X-RateLimit-Reset' => $retryAfter,
'Retry-After' => $retryAfter,
]);
}
// 记录本次请求
$redis->zAdd($key, $now, uniqid('req_', true));
// 给key设个过期时间,防止冷数据堆积
$redis->expire($key, $this->windowSeconds * 2);
// 把剩余次数写进请求属性里,控制器如果需要可以取用
$request->rateLimitRemaining = $this->maxRequests - $currentCount - 1;
return $next($request);
}
}
有几个细节说一下。第一,Cache::store('redis')->handler()拿到的就是原生的Redis对象(TP8底层用的是thinkcachedriverRedis),可以直接调ZSET相关方法。第二,zAdd的时候我用uniqid做member是为了避免极端并发下member重复导致计数不准,实际测试下来这个方案在单机千级QPS下表现稳定。第三,限流触发时返回的响应头里带了X-RateLimit-*系列信息,前端可以根据这些做友好提示。
中间件写好后需要注册。打开app/middleware.php,加入全局中间件或者路由中间件。我建议只对AI对话接口做限流,所以用路由中间件的方式:
// app/middleware.php
return [
// 全局中间件
appmiddlewareRateLimit::class,
];
当然你也可以在路由定义里按需挂载,看项目实际情况。
五、核心实现:流式对话控制器
这是整个方案的重头戏。先创建控制器:
php think make:controller Chat
然后打开app/controller/Chat.php,开始写逻辑。这里我会把代码拆成几个方法,方便维护和测试。
<?php
declare(strict_types=1);
namespace appcontroller;
use thinkfacadeCache;
use thinkfacadeDb;
use thinkRequest;
use GuzzleHttpClient;
use GuzzleHttpExceptionGuzzleException;
class Chat
{
/**
* DeepSeek API地址
*/
protected string $apiBaseUrl = 'https://api.deepseek.com/v1';
/**
* API Key,建议放config里或env里
*/
protected string $apiKey;
/**
* 使用的模型
*/
protected string $model = 'deepseek-chat';
/**
* 缓存过期时间(秒)
*/
protected int $cacheTtl = 3600;
/**
* 构造函数,从配置读取API Key
*/
public function __construct()
{
$this->apiKey = config('app.deepseek_api_key') ?: '';
if (empty($this->apiKey)) {
throw new RuntimeException('DeepSeek API Key未配置,请在config/app.php中设置deepseek_api_key');
}
}
/**
* 流式对话入口
*/
public function stream(Request $request)
{
$question = trim($request->param('question', ''));
$sessionId = $request->param('session_id', '');
if (empty($question)) {
return json([
'code' => 400,
'message' => '问题不能为空',
'data' => null,
])->code(400);
}
// 生成或沿用session_id
if (empty($sessionId)) {
$sessionId = $this->generateSessionId($request->ip());
}
// 先查缓存
$cacheKey = 'ai_answer:' . md5(mb_strtolower($question));
$cachedAnswer = Cache::store('redis')->get($cacheKey);
if ($cachedAnswer !== null) {
// 缓存命中,直接用SSE格式返回
return $this->sendCachedResponse($cachedAnswer, $sessionId, $question);
}
// 缓存未命中,调大模型
return $this->streamFromApi($question, $sessionId, $cacheKey);
}
/**
* 生成会话ID
*/
protected function generateSessionId(string $ip): string
{
return md5($ip . uniqid('', true) . time());
}
/**
* 从缓存返回结果(也走SSE格式,保持前端处理统一)
*/
protected function sendCachedResponse(string $cachedAnswer, string $sessionId, string $question)
{
// 设置SSE响应头
header('Content-Type: text/event-stream; charset=utf-8');
header('Cache-Control: no-cache, no-store, must-revalidate');
header('X-Accel-Buffering: no');
header('Connection: keep-alive');
// 关闭输出缓冲
if (ob_get_level()) {
ob_end_clean();
}
// 模拟流式输出(把缓存的内容分块推送,保持打字机效果)
$chunks = $this->splitIntoChunks($cachedAnswer, 3);
foreach ($chunks as $chunk) {
echo "data: " . json_encode([
'content' => $chunk,
'session_id' => $sessionId,
'done' => false,
], JSON_UNESCAPED_UNICODE) . "nn";
if (ob_get_level()) {
ob_flush();
}
flush();
usleep(30000); // 30ms间隔,模拟打字效果
}
// 发送结束标记
echo "data: " . json_encode([
'content' => '',
'session_id' => $sessionId,
'done' => true,
'from_cache' => true,
], JSON_UNESCAPED_UNICODE) . "nn";
if (ob_get_level()) {
ob_flush();
}
flush();
return response()->content('');
}
/**
* 从远程API获取流式响应并转发
*/
protected function streamFromApi(string $question, string $sessionId, string $cacheKey)
{
$startTime = microtime(true);
// 设置SSE响应头
header('Content-Type: text/event-stream; charset=utf-8');
header('Cache-Control: no-cache, no-store, must-revalidate');
header('X-Accel-Buffering: no');
header('Connection: keep-alive');
// 关闭输出缓冲
if (ob_get_level()) {
ob_end_clean();
}
$fullAnswer = '';
$tokenCount = 0;
try {
$client = new Client([
'timeout' => 120,
'headers' => [
'Authorization' => 'Bearer ' . $this->apiKey,
'Content-Type' => 'application/json',
'Accept' => 'text/event-stream',
],
]);
$response = $client->post($this->apiBaseUrl . '/chat/completions', [
'json' => [
'model' => $this->model,
'messages' => [
['role' => 'user', 'content' => $question],
],
'stream' => true,
'max_tokens' => 2048,
'temperature' => 0.7,
],
'stream' => true,
'timeout' => 120,
]);
$body = $response->getBody();
while (!$body->eof()) {
$line = $this->readLineFromStream($body);
if (empty($line) || !str_starts_with($line, 'data: ')) {
continue;
}
$jsonStr = substr($line, 6); // 去掉 "data: " 前缀
if ($jsonStr === '[DONE]') {
break;
}
$data = json_decode($jsonStr, true);
if ($data === null) {
continue;
}
$delta = $data['choices'][0]['delta']['content'] ?? '';
if ($delta === '') {
continue;
}
$fullAnswer .= $delta;
$tokenCount++;
// 推送增量内容给前端
echo "data: " . json_encode([
'content' => $delta,
'session_id' => $sessionId,
'done' => false,
], JSON_UNESCAPED_UNICODE) . "nn";
if (ob_get_level()) {
ob_flush();
}
flush();
}
$body->close();
} catch (GuzzleException $e) {
// 出错时推送错误信息
echo "data: " . json_encode([
'content' => '',
'session_id' => $sessionId,
'done' => true,
'error' => '模型调用失败: ' . $e->getMessage(),
], JSON_UNESCAPED_UNICODE) . "nn";
if (ob_get_level()) {
ob_flush();
}
flush();
return response()->content('');
}
$elapsedMs = (int)((microtime(true) - $startTime) * 1000);
// 发送结束标记
echo "data: " . json_encode([
'content' => '',
'session_id' => $sessionId,
'done' => true,
'from_cache' => false,
], JSON_UNESCAPED_UNICODE) . "nn";
if (ob_get_level()) {
ob_flush();
}
flush();
// 写入缓存和数据库(用fastcgi_finish_request优化,不阻塞响应)
if (!empty($fullAnswer)) {
// 缓存
Cache::store('redis')->set($cacheKey, $fullAnswer, $this->cacheTtl);
// 异步写库(这里用简单的insert,高并发场景建议走队列)
Db::table('chat_records')->insert([
'session_id' => $sessionId,
'user_ip' => request()->ip(),
'question' => $question,
'answer' => $fullAnswer,
'model' => $this->model,
'token_count' => $tokenCount,
'response_ms' => $elapsedMs,
'created_at' => date('Y-m-d H:i:s'),
]);
}
return response()->content('');
}
/**
* 从流中按行读取
*/
protected function readLineFromStream($body): string
{
$line = '';
while (!$body->eof()) {
$char = $body->read(1);
if ($char === "n") {
break;
}
if ($char === "r") {
continue;
}
$line .= $char;
}
return trim($line);
}
/**
* 将文本按字符数分块
*/
protected function splitIntoChunks(string $text, int $charsPerChunk): array
{
$chunks = [];
$len = mb_strlen($text);
for ($i = 0; $i < $len; $i += $charsPerChunk) {
$chunks[] = mb_substr($text, $i, $charsPerChunk);
}
return $chunks;
}
}
上面这段代码是控制器的完整实现,有几个点展开说一下:
关于输出缓冲的处理——PHP默认会开启output buffering,而Nginx也可能有代理缓冲。如果不处理,SSE的逐条推送会被缓冲成一大块再发给客户端,打字机效果就没了。代码里先ob_end_clean()关掉PHP层的缓冲,再通过header('X-Accel-Buffering: no')告诉Nginx不要缓冲这个请求。如果你的项目前面还套了Cloudflare或CDN,也需要确认它们对SSE的处理策略。
关于readLineFromStream——Guzzle的stream模式需要自己按行解析。SSE协议规定每条消息以data:开头、以两个换行结束。DeepSeek的流式API遵守这个规范,所以我们逐字符读取直到遇到换行符,然后解析JSON。
关于缓存写入——这里把完整答案缓存起来,相同问题下次直接走缓存。实际生产环境中可以考虑用语义相似度来判断问题是否”相同”,简单用MD5在大多数场景也够用。另外写入数据库的操作在响应完成后执行,不会拖慢用户感知的响应速度。高并发场景建议把写库操作丢进消息队列。
六、路由配置
在route/app.php中加上对话接口的路由:
<?php
use thinkfacadeRoute;
Route::get('chat/stream', 'Chat/stream');
注意这里用了GET方法,因为前端EventSource只支持GET请求。如果你的业务场景需要POST(比如问题很长),可以考虑用fetch+ReadableStream的方案,这个后面会提到替代思路。
七、配置API Key
在config/app.php里加上DeepSeek的API Key配置项:
// config/app.php 在数组末尾追加
'deepseek_api_key' => env('DEEPSEEK_API_KEY', ''),
然后在.env里设置实际的值:
DEEPSEEK_API_KEY = sk-xxxxxxxxxxxxxxxxxxxx
不要直接把Key硬编码在控制器里,这是基本原则。用env管理方便不同环境切换。
八、前端对接
前端用原生EventSource就够了,不需要额外引入任何库。给一个最简可用的实现:
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>AI对话测试</title>
</head>
<body>
<div>
<textarea id="questionInput" rows="4" cols="60" placeholder="请输入你的问题..."></textarea>
<br>
<button id="sendBtn">发送</button>
<span id="statusText">就绪</span>
</div>
<hr>
<div id="answerBox" style="white-space:pre-wrap;font-size:15px;line-height:1.8;min-height:200px;border:1px solid #ddd;padding:16px;margin-top:10px;background:#fafafa;">
等待输入...
</div>
<script>
let currentEventSource = null;
document.getElementById('sendBtn').addEventListener('click', function() {
const question = document.getElementById('questionInput').value.trim();
if (!question) {
alert('请输入问题');
return;
}
// 如果上一次请求还在进行中,先中断
if (currentEventSource) {
currentEventSource.close();
currentEventSource = null;
}
const answerBox = document.getElementById('answerBox');
const statusText = document.getElementById('statusText');
answerBox.textContent = '';
statusText.textContent = '请求中...';
document.getElementById('sendBtn').disabled = true;
// 生成或复用session_id(实际项目可存localStorage)
const sessionId = 'sess_' + Date.now() + '_' + Math.random().toString(36).substring(2, 10);
const url = '/chat/stream?question=' + encodeURIComponent(question) + '&session_id=' + sessionId;
currentEventSource = new EventSource(url);
let fullContent = '';
currentEventSource.addEventListener('message', function(event) {
try {
const data = JSON.parse(event.data);
if (data.error) {
answerBox.textContent = '错误: ' + data.error;
statusText.textContent = '出错';
currentEventSource.close();
currentEventSource = null;
document.getElementById('sendBtn').disabled = false;
return;
}
if (data.done) {
statusText.textContent = data.from_cache ? '完成(来自缓存)' : '完成';
currentEventSource.close();
currentEventSource = null;
document.getElementById('sendBtn').disabled = false;
return;
}
fullContent += data.content;
answerBox.textContent = fullContent;
statusText.textContent = '接收中...';
} catch (e) {
console.error('解析SSE数据失败:', e);
}
});
currentEventSource.addEventListener('error', function(event) {
if (currentEventSource && currentEventSource.readyState === EventSource.CLOSED) {
statusText.textContent = '连接已关闭';
} else {
statusText.textContent = '连接异常,请重试';
}
document.getElementById('sendBtn').disabled = false;
currentEventSource = null;
});
});
</script>
</body>
</html>
把上面这个HTML放到TP8的public/目录下或者通过控制器渲染都可以。核心逻辑就三个:创建EventSource、监听message事件拼接内容、处理done和error。实际项目中建议把session_id存到localStorage里,这样刷新页面后还能继续之前的对话上下文(需要后端配合查历史记录)。
九、测试与调优
启动TP8内置服务器:
php think run -p 8080
浏览器打开http://localhost:8080/chat.html(假设前端文件叫chat.html),输入一个问题,应该能看到AI回复逐字蹦出来。
9.1 限流测试
用ab或者直接狂点发送按钮,在60秒内超过20次请求后应该收到429响应。浏览器开发者工具的Network标签里能看到返回的X-RateLimit-Remaining头部逐次递减。
9.2 缓存验证
问一个之前问过的完全相同的问题,第二次响应会快很多,而且SSE的结束消息里from_cache字段是true。用redis-cli查看:
redis-cli
> KEYS ai_answer:*
> GET ai_answer:xxxxx
能看到缓存key和对应的完整答案。
9.3 可能踩的坑
- Nginx缓冲问题:如果部署在Nginx后面,记得在location配置里加上
proxy_buffering off;和proxy_cache off;,否则SSE会被缓冲。 - PHP执行时间:流式响应的持续时间可能超过默认的30秒
max_execution_time,建议在控制器开头加set_time_limit(120);或调整php.ini。 - 内存限制:长对话可能产生大量token,注意
memory_limit的设置,一般128M足够。 - EventSource的GET限制:URL长度有限制(浏览器通常限制2KB左右),如果用户问题非常长,建议改用fetch+ReadableStream方案,可以发POST请求。
9.4 性能优化建议
上线前可以考虑这些优化:
- 把写库操作从控制器里拆出来,丢进Redis队列,用另一个消费者进程异步写入。TP8支持队列,用
think-queue扩展即可。 - 对高频问题做预热缓存,比如把历史Top100的问题提前算好存进Redis。
- 如果调用的是付费API,加一个每日总额度限制,防止被刷爆。
- 考虑加上语义相似度匹配,用简单的余弦相似度或调用embedding接口,让相似问题也能命中缓存。
十、替代方案:fetch + ReadableStream
前面提到EventSource只支持GET,如果业务必须用POST(比如携带大量上下文消息),可以用下面的前端方案替代:
async function fetchStream(url, body) {
const response = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(body),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('n');
buffer = lines.pop() || ''; // 保留最后一个不完整的行
for (const line of lines) {
if (line.startsWith('data: ')) {
const jsonStr = line.slice(6);
if (jsonStr === '[DONE]') return;
const data = JSON.parse(jsonStr);
// 处理data...
}
}
}
}
后端对应的接口改成POST即可,其他逻辑不变。ReadableStream的浏览器兼容性已经很好,主流浏览器都支持。
十一、写在最后
这套方案从写第一行代码到上线跑了大概三天,中间踩的主要是输出缓冲和Nginx代理缓冲的坑。整体来说ThinkPHP 8.0在构建这类实时流式接口时表现挺稳的,中间件机制让限流逻辑很干净地解耦出来,不会污染业务代码。
如果你打算基于这个方案做二次开发,几个方向可以参考:
- 加上多轮对话上下文管理(把历史消息存Redis,每次请求带上)
- 接入多个模型做负载均衡或AB对比
- 用WebSocket替代SSE实现真正的双向通信
- 做一套简单的管理后台,查看调用统计和费用
代码都贴在文章里了,直接复制就能跑。有任何问题欢迎在评论区讨论,我看到了会回复。

