Python 异步编程与FastAPI流式响应:构建实时数据管道

2026-04-25 0 448

2025年,异步编程已经成为Python高性能服务的基石。FastAPI作为新兴的异步Web框架,结合Python的 async/await 和异步生成器,可以轻松构建流式数据管道。本文通过三个完整案例,带你掌握流式响应的核心技巧。


1. 为什么需要流式响应?

传统Web API一次性返回完整响应,对于大文件、实时数据或长耗时任务,会导致客户端长时间等待和内存暴增。流式响应(Streaming Response)允许服务器逐块发送数据,客户端可以边接收边处理。典型场景包括:

  • 大文件下载(视频、日志)
  • AI对话流式输出(类似ChatGPT打字机效果)
  • 实时数据推送(股票行情、监控指标)
  • 数据库游标逐行返回

2. 核心基础:异步生成器

Python 3.6+ 支持异步生成器(async def + yield),它是流式响应的基石。与普通生成器不同,异步生成器可以在每次 yield 之间执行 await 操作。

import asyncio

# 异步生成器:每隔1秒产生一个数字
async def number_stream(max_num):
    for i in range(max_num):
        await asyncio.sleep(1)  # 模拟异步IO
        yield i

# 消费异步生成器
async def main():
    async for num in number_stream(5):
        print(f"收到: {num}")

asyncio.run(main())

异步生成器自动实现了 __aiter____anext__ 协议,可以直接用于 async for 循环。


3. 实战案例一:FastAPI流式文本响应

FastAPI 内置了 StreamingResponse,可以接收一个异步生成器作为响应体。下面实现一个模拟的实时日志流API。

# 安装: pip install fastapi uvicorn

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import time

app = FastAPI()

async def log_generator():
    """模拟实时日志生成"""
    for i in range(20):
        await asyncio.sleep(0.3)  # 每300ms产生一条日志
        timestamp = time.strftime("%H:%M:%S")
        yield f"[{timestamp}] 日志条目 #{i+1}: 系统运行正常n"

@app.get("/logs/stream")
async def stream_logs():
    """返回流式日志响应"""
    return StreamingResponse(
        log_generator(),
        media_type="text/plain",
        headers={
            "X-Stream-Id": "demo-log-stream",
            "Cache-Control": "no-cache",
        }
    )

# 运行: uvicorn main:app --reload
# 访问 http://localhost:8000/logs/stream 即可看到逐行输出的日志

关键点:

  • StreamingResponse 第一个参数可以是异步生成器或同步生成器
  • 设置 media_type 告诉客户端内容类型
  • 添加自定义header提供元信息

4. 实战案例二:Server-Sent Events (SSE) 实时推送

SSE 是一种轻量级的服务器推送协议,使用 text/event-stream 格式。FastAPI 结合异步生成器可以完美实现SSE。

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()

async def event_generator(request: Request):
    """生成SSE事件流"""
    counter = 0
    while True:
        # 检查客户端是否断开连接
        if await request.is_disconnected():
            break
        
        counter += 1
        data = {
            "id": counter,
            "message": f"事件 #{counter}",
            "timestamp": asyncio.get_event_loop().time()
        }
        
        # SSE格式: data: {json}nn
        yield f"data: {json.dumps(data)}nn"
        
        await asyncio.sleep(1)  # 每秒推送一次
        
        if counter >= 10:  # 推送10次后结束
            yield "event: donendata: 流结束nn"
            break

@app.get("/events")
async def sse_endpoint(request: Request):
    return StreamingResponse(
        event_generator(request),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # 禁用Nginx缓冲
        }
    )

# 前端使用 EventSource 接收:
# const evtSource = new EventSource('/events');
# evtSource.onmessage = (e) => console.log(JSON.parse(e.data));

SSE格式规范:

  • 每行以 data: 开头,后面跟数据
  • 事件之间用两个换行符 nn 分隔
  • 支持 event: 自定义事件类型,id: 设置最后事件ID

5. 实战案例三:大文件流式下载

当需要提供大文件下载时(如视频、数据集),流式响应可以避免占用服务器内存。下面实现一个模拟的大文件流式下载。

import os
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

async def file_chunk_generator(file_path: str, chunk_size: int = 1024 * 1024):
    """异步逐块读取文件"""
    # 使用异步文件操作 (需要 aiofiles 库)
    # 这里用同步方式演示,生产环境建议 aiofiles
    with open(file_path, "rb") as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            yield chunk
            # 模拟异步让出控制权
            await asyncio.sleep(0)

@app.get("/download/sample")
async def download_sample():
    file_path = "large_sample.dat"  # 假设存在一个文件
    
    # 获取文件大小
    file_size = os.path.getsize(file_path)
    
    return StreamingResponse(
        file_chunk_generator(file_path),
        media_type="application/octet-stream",
        headers={
            "Content-Disposition": f'attachment; filename="sample.dat"',
            "Content-Length": str(file_size),
        }
    )

# 生产环境推荐使用 aiofiles 实现真正的异步文件IO:
# pip install aiofiles
# async with aiofiles.open(file_path, "rb") as f:
#     while chunk := await f.read(chunk_size):
#         yield chunk

6. 进阶:流式响应中的错误处理与取消

当客户端断开连接时,服务器应该停止生成数据,避免资源浪费。FastAPI 提供了 request.is_disconnected() 方法检测客户端状态。

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def safe_generator(request: Request):
    try:
        for i in range(100):
            # 检查断开
            if await request.is_disconnected():
                print(f"客户端断开,停止生成")
                break
            
            yield f"data: 第{i+1}条nn"
            await asyncio.sleep(0.5)
    except asyncio.CancelledError:
        print("生成器被取消")
        # 清理资源
    finally:
        print("生成器结束")

@app.get("/safe-stream")
async def safe_stream(request: Request):
    return StreamingResponse(
        safe_generator(request),
        media_type="text/event-stream"
    )

此外,可以使用 asyncio.CancelledError 捕获任务取消事件,确保资源正确释放。


7. 性能对比:流式 vs 非流式

指标 传统JSON响应 流式响应
首字节时间 (TTFB) 需等待全部数据准备完成 几乎即时(生成第一块即发送)
内存占用 全部数据加载到内存 仅保留当前块(通常几KB)
客户端体验 等待完整响应后展示 边接收边展示,延迟低
适用场景 小数据量、低延迟要求不高 大数据、实时性要求高

8. 最佳实践总结

  • 始终检查客户端断开:使用 request.is_disconnected() 避免无效计算
  • 控制生产速度:如果消费者比生产者慢,考虑背压(backpressure)机制
  • 使用合适的chunk大小:文件下载建议 1MB-8MB,SSE事件建议单个事件不要过大
  • 设置正确的Content-Type:流式文本用 text/plain,SSE用 text/event-stream
  • 配合反向代理:Nginx 需要设置 proxy_buffering off; 否则会缓冲整个响应
# Nginx 配置示例
location /events {
    proxy_pass http://backend;
    proxy_buffering off;
    proxy_cache off;
    proxy_set_header Connection '';
    chunked_transfer_encoding on;
}

9. 总结

通过本文的案例,你掌握了Python异步生成器的核心用法,以及如何在FastAPI中实现三种流式响应:

  • 文本流(实时日志)
  • SSE事件流(实时推送)
  • 大文件流(高效下载)

流式响应是构建高性能、低延迟Web服务的关键技术。结合Python的异步生态,你可以轻松应对各种实时数据处理场景。


本文原创,基于Python 3.12 + FastAPI 0.110+。所有代码均经过测试,可直接运行。

Python 异步编程与FastAPI流式响应:构建实时数据管道
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 python Python 异步编程与FastAPI流式响应:构建实时数据管道 https://www.taomawang.com/server/python/1745.html

常见问题

相关文章

猜你喜欢
发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务