上周在做一个气象数据聚合服务的时候,需要同时从五六个不同的第三方API拉数据,每个接口的响应时间在几百毫秒到几秒不等。最开始用传统的同步请求加多线程,线程池大小不好拍定,还经常因为一个接口阻塞拖慢整个周期。后来用asyncio重写了这一块,尤其是用上了3.11才稳定的TaskGroup,整个并发流程的控制变得前所未有的清晰——一组任务要么全部成功,要么一起取消,不再需要手动收集异常和逐个取消。
这篇文章就把那次改造的核心脉络梳理出来,从基础的asyncio.gather的局限说起,逐步过渡到TaskGroup的实战,结合超时、异常和取消的处理,最后给出一个可以直接用的生产模版。
为什么 gather 有时不够用
大部分刚接触 asyncio 的开发者都用过 asyncio.gather。它确实很方便——传入一批协程,等它们全完成,返回结果列表。但它的短板也很明显:
- 如果其中一个协程抛出异常,
gather默认会立即将异常传播出来,但其他协程并不会被自动取消,它们可能还在后台默默运行。 - 想要在某个任务失败时取消其余任务,你需要额外写
try/except并遍历Task列表手动cancel()。 - 如果忘记取消,一些协程会继续占用资源,甚至在程序退出时抛出恼人的警告。
换句话说,gather 并没有提供一种结构化的并发控制。而结构化并发正是 TaskGroup 的拿手好戏。
TaskGroup 的核心思想
Python 3.11 正式引入了 asyncio.TaskGroup,它借鉴了 Trio 库的“托儿所”模式。用法是创建一个 TaskGroup 作为上下文管理器,在其内部创建的异步任务都会在这个作用域内运行。当上下文退出时,它会自动等待所有任务完成;如果任何一个任务抛出异常,它会取消作用域内所有尚未完成的任务,然后将异常向上传播。这个机制保证了任务组要么全部成功,要么整体失败,不会有残留任务。
基础模板:并发获取多个 URL
直接看代码最直观。我们模拟从三个不同天气 API 获取数据,每个接口用 asyncio.sleep 模拟耗时,并且故意让其中一个在特定条件下失败。
import asyncio
from random import random
async def fetch_weather(source: str, city: str, fail_chance: float = 0.0):
"""模拟请求某个天气 API,source 代表接口名称"""
delay = 0.5 + random() * 2.0
await asyncio.sleep(delay)
if random() < fail_chance:
raise ConnectionError(f"{source} 请求失败: 网络波动")
return {
"source": source,
"city": city,
"temp": 15 + int(random() * 20),
"humidity": 40 + int(random() * 40),
}
async def main():
city = "北京"
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_weather("WeatherAPI", city, fail_chance=0.1))
task2 = tg.create_task(fetch_weather("OpenMeteo", city, fail_chance=0.1))
task3 = tg.create_task(fetch_weather("VisualCrossing", city, fail_chance=0.1))
# 上下文结束后,所有任务已完成(或已被取消)
results = [task1.result(), task2.result(), task3.result()]
for res in results:
print(res)
asyncio.run(main())
运行几次就会看到,一旦某个任务因 ConnectionError 失败,其余任务立即被取消,TaskGroup 重新抛出那个异常,main 函数当即中断。如果希望即使部分失败也能拿到成功的数据,就得在 TaskGroup 外面捕获异常,但要注意此时所有任务都已经被取消了,并不会像 gather(return_exceptions=True) 那样得到异常列表。
实战:部分失败也要收集结果的策略
在实际气象聚合中,我们不能因为一个 API 挂掉就啥都不返回。策略是:在单个任务内部捕获异常,保证每个任务自身不抛出未处理的异常,这样 TaskGroup 就能正常结束。我们把异常转换为特殊结果。
async def safe_fetch(source: str, city: str):
try:
return await fetch_weather(source, city, fail_chance=0.2)
except Exception as e:
return {"source": source, "error": str(e)}
async def main_safe():
city = "北京"
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(safe_fetch("WeatherAPI", city))
t2 = tg.create_task(safe_fetch("OpenMeteo", city))
t3 = tg.create_task(safe_fetch("VisualCrossing", city))
results = [t.result() for t in (t1, t2, t3)]
for res in results:
if "error" in res:
print(f"[失败] {res['source']}: {res['error']}")
else:
print(f"[成功] {res['source']}: {res['temp']}°C")
return results
这样就完美了——三个任务互不影响,全部完成后统一收集结果。但这里有个陷阱:如果 safe_fetch 内部的异常是因为 asyncio.CancelledError(比如整个组被外部取消),你把它吞掉了,任务就不会被正确取消。因此做法应该是:保留 CancelledError 的传播,只捕获业务异常。
async def safe_fetch(source: str, city: str):
try:
return await fetch_weather(source, city, fail_chance=0.2)
except asyncio.CancelledError:
raise # 让取消信号正常传播
except Exception as e:
return {"source": source, "error": str(e)}
给整个任务组加上超时
天气数据需要在一定时间内返回,否则就直接降级。我们可以用 asyncio.wait_for 包裹整个 TaskGroup 上下文,或者用 asyncio.timeout(3.11+)。这里采用后者的上下文管理器:
async def main_with_timeout():
city = "北京"
try:
async with asyncio.timeout(3.0): # 整组3秒超时
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(safe_fetch(src, city))
for src in ("WeatherAPI", "OpenMeteo", "VisualCrossing")
]
results = [t.result() for t in tasks]
except TimeoutError:
print("超出整体超时时间,部分数据可能不完整")
# 此时 tasks 内的任务已被取消,取 result() 会抛 InvalidStateError
# 因此需要提前保存已完成结果,或直接进入降级逻辑
results = []
return results
注意 TimeoutError 被捕获后,TaskGroup 内部的未完成任务已经被取消,我们无法再安全调用 task.result()。更稳健的做法是将任务引用存在列表里,在 timeout 触发前设法取出已完成的结果。实际项目里我们往往会再套一层,使用 asyncio.as_completed 配合 TaskGroup 来逐个消费完成的任务,不过那样会失去结构化并发的严格性。可以根据业务需求做折中。
真实场景:聚合多个 API 并做数据融合
回到最初的气象服务,完整的流程大致如下:
- 定义几个异步请求函数,根据不同的 API 规范构造请求头、解析响应。
- 在
TaskGroup中启动所有请求,每个请求内部做好异常隔离(只捕获业务异常)。 - 为整个组加上
timeout防止整体卡死。 - 任务全部完成后,拿到成功的结果列表,取温度、湿度等数据的中位数或加权平均。
- 如果全部失败,就使用缓存或返回降级信息。
代码骨架如下:
async def aggregate_weather(city: str):
sources = [
("WeatherAPI", "https://api.weatherapi.com/v1/current.json"),
("OpenMeteo", "https://api.open-meteo.com/v1/forecast"),
("VisualCrossing", "https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline"),
]
tasks = []
try:
async with asyncio.timeout(4.0):
async with asyncio.TaskGroup() as tg:
for name, url in sources:
tasks.append(
tg.create_task(safe_fetch_from_api(name, url, city))
)
except TimeoutError:
pass # 降级逻辑在外层处理
successful = [t.result() for t in tasks if not t.cancelled() and "error" not in t.result()]
if not successful:
return await get_cached_weather(city)
avg_temp = sum(d["temp"] for d in successful) / len(successful)
return {"city": city, "avg_temp": avg_temp, "details": successful}
取消传播的细节:不要让取消信号丢失
在整个过程中,最容易被忽视的就是 CancelledError 的处理。如果在 safe_fetch 里把它吞了,那么外部调用 cancel() 时这个任务不会停止,导致 TaskGroup 无法及时关闭。牢记一条:除非你明确知道要阻止取消,否则永远不要捕获 asyncio.CancelledError 而不重新抛出。 即使在需要清理资源的 finally 块中,也应该避免长时间操作,因为取消后任务会尽快结束。
总结
从 gather 到 TaskGroup,Python 的异步编程正在向更安全、更结构化的方向演进。对于需要同时访问多个外部资源、又要保证资源不泄漏的业务来说,TaskGroup 是极佳的选择。气象聚合这个场景只是冰山一角:微服务间调用、搜索引擎并行查询、数据入库前的批量校验,都能用它把并发控制压缩在几行上下文管理器里。
如果你还在靠着 gather 加上手动 try/except 过日子,不妨翻出最近的异步代码,用 TaskGroup 重写一遍,那种“一组任务同生共死”的爽快感,比多写几个并发数更让人安心。

