当处理实时数据流——比如日志监控、消息队列消费、或者大文件的分块读取时,同步生成器很容易成为I/O瓶颈。Python 的异步生成器结合 asyncio 提供了既能惰性产生数据、又能非阻塞等待外部资源的解决方案。本文通过一个完整的“实时日志收集与分发”案例,把 async yield、async for、asyncio.Queue 以及背压控制串成一条可落地的处理管线。
一、为什么需要异步生成器
普通生成器用 yield 返回值,适合一次性准备好的集合。但数据源来自网络、文件或定时轮询时,生成过程的每一步都可能阻塞。传统做法是用 async/await 写协程函数,但协程函数返回的是单个协程,无法在遍历中边产生边等待。
异步生成器(PEP 525)解决了这个问题:它可以在产生值的间隙中使用 await,让出控制权给事件循环。调用方使用 async for 迭代,等价于在每次循环中等待一个异步步骤。
二、基础语法快速回顾
定义一个异步生成器函数:
import asyncio
async def async_range(n):
for i in range(n):
await asyncio.sleep(0.5) # 模拟异步I/O
yield i
使用 async for 进行迭代:
async def main():
async for num in async_range(3):
print(num)
asyncio.run(main())
异步生成器对象的 __anext__ 返回一个可等待对象,async for 内部会自动处理 StopAsyncIteration。接下来我们把这个基础能力应用到实战的流处理中。
三、案例场景:实时日志监控与分发
假设我们要监控某个应用日志文件,每当有新行写入时,就将该行解析后异步发送到两个地方:一个Webhook服务和一个本地数据库。为了实现高吞吐且不丢失数据,我们设计一条由三个异步生成器串联的管线:
- 日志读取器:异步生成器,跟随文件末尾(类似
tail -f),产生原始日志行。 - 解析器:异步生成器,接收原始行,解析出结构化数据,过滤无效行。
- 分发器:异步生成器,将结构化数据发送到两个下游服务,并处理失败重试。
管线之间通过 asyncio.Queue 传递数据,实现解耦和背压。
四、实现异步日志读取器
import asyncio
import aiofiles # 假设已安装
async def async_log_tailer(filepath: str, poll_interval: float = 0.2):
"""异步生成器:持续读取文件新增内容,产生日志行。"""
async with aiofiles.open(filepath, 'r') as f:
# 跳到文件末尾
await f.seek(0, 2)
while True:
line = await f.readline()
if line:
yield line.rstrip('n')
else:
await asyncio.sleep(poll_interval)
这里用了 aiofiles 来异步读取文件。循环中如果没有新数据就睡眠一小段时间,避免空转耗 CPU。这个生成器会无限产出日志行,直到被外部取消。
五、构建解析器与分发器
解析器接收原始行,提取日志级别、时间戳和消息正文:
import re
from typing import Optional
LOG_PATTERN = re.compile(r'(d+-d+-d+ d+:d+:d+) | (w+) | (.+)')
async def log_parser(source):
"""异步生成器:从任意异步可迭代对象中读取原始行,输出解析后的字典。"""
async for raw_line in source:
if not raw_line:
continue
m = LOG_PATTERN.match(raw_line)
if m:
yield {
'timestamp': m.group(1),
'level': m.group(2),
'message': m.group(3),
}
# 格式不匹配的行直接丢弃
分发器接收解析后的数据,并发发送到两个下游:
async def distributor(source, max_concurrency=5):
"""异步生成器:消费解析数据,并发发送到多个目标,输出发送结果。"""
sem = asyncio.Semaphore(max_concurrency)
async def send_to_target(target_func, data):
async with sem:
try:
# target_func 是异步发送函数,例如 aiohttp 的 post
await asyncio.wait_for(target_func(data), timeout=5.0)
return True
except Exception:
return False
async for record in source:
# 发送到两个目标,可并发
tasks = [
send_to_target(send_to_webhook, record),
send_to_target(write_to_local_db, record),
]
results = await asyncio.gather(*tasks, return_exceptions=True)
yield {'record': record, 'webhook_ok': results[0], 'db_ok': results[1]}
这里使用了 Semaphore 限制并发数,避免瞬间创建过多连接。如果下游处理慢,队列会自然形成背压。
六、用队列组装管线并处理背压
将三个异步生成器用 asyncio.Queue 连接起来:
async def pipeline(filepath: str):
raw_queue = asyncio.Queue(maxsize=1000) # 限制容量,实现背压
parsed_queue = asyncio.Queue(maxsize=500)
async def read_to_queue():
async for line in async_log_tailer(filepath):
await raw_queue.put(line)
async def parse_to_queue():
async for record in log_parser(raw_queue_generator(raw_queue)):
await parsed_queue.put(record)
async def distribute_from_queue():
async for result in distributor(parsed_queue_generator(parsed_queue)):
# 可以做最后的收集,例如统计成功失败数
print(f"处理完成: {result['record']['message'][:50]}...")
# 将生成器包装为可从队列中 async for 迭代的对象
# 简单实现如下:
async def raw_queue_generator(q):
while True:
item = await q.get()
yield item
q.task_done()
async def parsed_queue_generator(q):
while True:
item = await q.get()
yield item
q.task_done()
# 并发执行三个阶段
tasks = [
asyncio.create_task(read_to_queue()),
asyncio.create_task(parse_to_queue()),
asyncio.create_task(distribute_from_queue()),
]
await asyncio.gather(*tasks)
关键在于队列的 maxsize 参数。当分发器消费速度跟不上日志产生速度时,解析器向 parsed_queue 放入数据会被阻塞,进而使得读取器向 raw_queue 放入也会阻塞,形成自然反压,避免内存无限增长。这就是异步队列对流控的核心价值。
七、优雅关闭与取消
实际运行中需要处理 Ctrl+C 或者服务关闭信号。可以让管线感知取消事件,清理资源:
async def pipeline(filepath: str):
shutdown_event = asyncio.Event()
# 在信号处理中设置 shutdown_event
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, shutdown_event.set)
# 修改 read_to_queue,使其在 shutdown 时退出
async def read_to_queue():
async for line in async_log_tailer(filepath):
if shutdown_event.is_set():
break
await raw_queue.put(line)
# ... 其余类似处理,最后 gather 时会自然退出
异步生成器的 aclose() 方法可以在外部取消时抛出 GeneratorExit,但通常我们通过在生成器内部检查外部条件来实现更可控的退出。
八、避免常见的坑
- 不要混用同步生成器与异步生成器:
async for只能遍历异步生成器,把它用在普通列表上会导致类型错误。可以用async for item in async_iter迭代一个包装了普通列表的异步可迭代对象,但没必要,直接用普通循环更清晰。 - 队列的
get与put必须配合task_done():如果需要在管线完成后确认所有任务都已处理,调用q.join()可以等待所有取出的项被标记为task_done()。上面的生成器包装里已经包含了task_done()。 - 异步生成器内不要使用
yield from:异步生成器有自己的async yield from语法(Python 3.6+ 支持),但更推荐直接async for x in subgen: yield x,意图更明确。 - 异常处理:如果分发器中的某次发送失败,不要影响整个流。我们在
send_to_target中捕获了异常并返回False,不会破坏生成器。
九、拓展思路
这套模式不仅适用于日志处理。任何需要“读取-转换-输出”的流式场景,比如Kafka消费、WebSocket消息中转、大CSV文件的逐行清洗入库,都可以套用同一个异步生成器管线。引入 asyncio.Queue 的最大好处是:你可以独立调整每一段的并发度和队列容量,像拧螺丝一样调优整个链路的吞吐量。
十、总结
Python 的异步生成器把生成器的惰性特点和 asyncio 的非阻塞能力结合在一起,是构建弹性流式数据处理管线的理想工具。通过 async for 串联多个异步生成器,并利用有界队列自然实现背压,我们能以非常低的复杂度写出可暂停、可取消、高吞吐的数据流组件。下一次遇到实时数据流需求,不妨尝试用这条“异步生成器 + 队列”的管线思路替代线程或手工协程调度,代码会清爽很多。

