Python asyncio TaskGroup实战:高并发数据抓取与优雅取消全掌握

2026-06-18 0 713

上周在做一个气象数据聚合服务的时候,需要同时从五六个不同的第三方API拉数据,每个接口的响应时间在几百毫秒到几秒不等。最开始用传统的同步请求加多线程,线程池大小不好拍定,还经常因为一个接口阻塞拖慢整个周期。后来用asyncio重写了这一块,尤其是用上了3.11才稳定的TaskGroup,整个并发流程的控制变得前所未有的清晰——一组任务要么全部成功,要么一起取消,不再需要手动收集异常和逐个取消。

这篇文章就把那次改造的核心脉络梳理出来,从基础的asyncio.gather的局限说起,逐步过渡到TaskGroup的实战,结合超时、异常和取消的处理,最后给出一个可以直接用的生产模版。

为什么 gather 有时不够用

大部分刚接触 asyncio 的开发者都用过 asyncio.gather。它确实很方便——传入一批协程,等它们全完成,返回结果列表。但它的短板也很明显:

  • 如果其中一个协程抛出异常,gather 默认会立即将异常传播出来,但其他协程并不会被自动取消,它们可能还在后台默默运行。
  • 想要在某个任务失败时取消其余任务,你需要额外写 try/except 并遍历 Task 列表手动 cancel()
  • 如果忘记取消,一些协程会继续占用资源,甚至在程序退出时抛出恼人的警告。

换句话说,gather 并没有提供一种结构化的并发控制。而结构化并发正是 TaskGroup 的拿手好戏。

TaskGroup 的核心思想

Python 3.11 正式引入了 asyncio.TaskGroup,它借鉴了 Trio 库的“托儿所”模式。用法是创建一个 TaskGroup 作为上下文管理器,在其内部创建的异步任务都会在这个作用域内运行。当上下文退出时,它会自动等待所有任务完成;如果任何一个任务抛出异常,它会取消作用域内所有尚未完成的任务,然后将异常向上传播。这个机制保证了任务组要么全部成功,要么整体失败,不会有残留任务。

基础模板:并发获取多个 URL

直接看代码最直观。我们模拟从三个不同天气 API 获取数据,每个接口用 asyncio.sleep 模拟耗时,并且故意让其中一个在特定条件下失败。

import asyncio
from random import random

async def fetch_weather(source: str, city: str, fail_chance: float = 0.0):
    """模拟请求某个天气 API,source 代表接口名称"""
    delay = 0.5 + random() * 2.0
    await asyncio.sleep(delay)
    if random() < fail_chance:
        raise ConnectionError(f"{source} 请求失败: 网络波动")
    return {
        "source": source,
        "city": city,
        "temp": 15 + int(random() * 20),
        "humidity": 40 + int(random() * 40),
    }

async def main():
    city = "北京"
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_weather("WeatherAPI", city, fail_chance=0.1))
        task2 = tg.create_task(fetch_weather("OpenMeteo", city, fail_chance=0.1))
        task3 = tg.create_task(fetch_weather("VisualCrossing", city, fail_chance=0.1))

    # 上下文结束后,所有任务已完成(或已被取消)
    results = [task1.result(), task2.result(), task3.result()]
    for res in results:
        print(res)

asyncio.run(main())

运行几次就会看到,一旦某个任务因 ConnectionError 失败,其余任务立即被取消,TaskGroup 重新抛出那个异常,main 函数当即中断。如果希望即使部分失败也能拿到成功的数据,就得在 TaskGroup 外面捕获异常,但要注意此时所有任务都已经被取消了,并不会像 gather(return_exceptions=True) 那样得到异常列表。

实战:部分失败也要收集结果的策略

在实际气象聚合中,我们不能因为一个 API 挂掉就啥都不返回。策略是:在单个任务内部捕获异常,保证每个任务自身不抛出未处理的异常,这样 TaskGroup 就能正常结束。我们把异常转换为特殊结果。

async def safe_fetch(source: str, city: str):
    try:
        return await fetch_weather(source, city, fail_chance=0.2)
    except Exception as e:
        return {"source": source, "error": str(e)}

async def main_safe():
    city = "北京"
    async with asyncio.TaskGroup() as tg:
        t1 = tg.create_task(safe_fetch("WeatherAPI", city))
        t2 = tg.create_task(safe_fetch("OpenMeteo", city))
        t3 = tg.create_task(safe_fetch("VisualCrossing", city))

    results = [t.result() for t in (t1, t2, t3)]
    for res in results:
        if "error" in res:
            print(f"[失败] {res['source']}: {res['error']}")
        else:
            print(f"[成功] {res['source']}: {res['temp']}°C")
    return results

这样就完美了——三个任务互不影响,全部完成后统一收集结果。但这里有个陷阱:如果 safe_fetch 内部的异常是因为 asyncio.CancelledError(比如整个组被外部取消),你把它吞掉了,任务就不会被正确取消。因此做法应该是:保留 CancelledError 的传播,只捕获业务异常。

async def safe_fetch(source: str, city: str):
    try:
        return await fetch_weather(source, city, fail_chance=0.2)
    except asyncio.CancelledError:
        raise  # 让取消信号正常传播
    except Exception as e:
        return {"source": source, "error": str(e)}

给整个任务组加上超时

天气数据需要在一定时间内返回,否则就直接降级。我们可以用 asyncio.wait_for 包裹整个 TaskGroup 上下文,或者用 asyncio.timeout(3.11+)。这里采用后者的上下文管理器:

async def main_with_timeout():
    city = "北京"
    try:
        async with asyncio.timeout(3.0):  # 整组3秒超时
            async with asyncio.TaskGroup() as tg:
                tasks = [
                    tg.create_task(safe_fetch(src, city))
                    for src in ("WeatherAPI", "OpenMeteo", "VisualCrossing")
                ]
            results = [t.result() for t in tasks]
    except TimeoutError:
        print("超出整体超时时间,部分数据可能不完整")
        # 此时 tasks 内的任务已被取消,取 result() 会抛 InvalidStateError
        # 因此需要提前保存已完成结果,或直接进入降级逻辑
        results = []
    return results

注意 TimeoutError 被捕获后,TaskGroup 内部的未完成任务已经被取消,我们无法再安全调用 task.result()。更稳健的做法是将任务引用存在列表里,在 timeout 触发前设法取出已完成的结果。实际项目里我们往往会再套一层,使用 asyncio.as_completed 配合 TaskGroup 来逐个消费完成的任务,不过那样会失去结构化并发的严格性。可以根据业务需求做折中。

真实场景:聚合多个 API 并做数据融合

回到最初的气象服务,完整的流程大致如下:

  1. 定义几个异步请求函数,根据不同的 API 规范构造请求头、解析响应。
  2. TaskGroup 中启动所有请求,每个请求内部做好异常隔离(只捕获业务异常)。
  3. 为整个组加上 timeout 防止整体卡死。
  4. 任务全部完成后,拿到成功的结果列表,取温度、湿度等数据的中位数或加权平均。
  5. 如果全部失败,就使用缓存或返回降级信息。

代码骨架如下:

async def aggregate_weather(city: str):
    sources = [
        ("WeatherAPI", "https://api.weatherapi.com/v1/current.json"),
        ("OpenMeteo", "https://api.open-meteo.com/v1/forecast"),
        ("VisualCrossing", "https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline"),
    ]
    tasks = []
    try:
        async with asyncio.timeout(4.0):
            async with asyncio.TaskGroup() as tg:
                for name, url in sources:
                    tasks.append(
                        tg.create_task(safe_fetch_from_api(name, url, city))
                    )
    except TimeoutError:
        pass  # 降级逻辑在外层处理

    successful = [t.result() for t in tasks if not t.cancelled() and "error" not in t.result()]
    if not successful:
        return await get_cached_weather(city)
    avg_temp = sum(d["temp"] for d in successful) / len(successful)
    return {"city": city, "avg_temp": avg_temp, "details": successful}

取消传播的细节:不要让取消信号丢失

在整个过程中,最容易被忽视的就是 CancelledError 的处理。如果在 safe_fetch 里把它吞了,那么外部调用 cancel() 时这个任务不会停止,导致 TaskGroup 无法及时关闭。牢记一条:除非你明确知道要阻止取消,否则永远不要捕获 asyncio.CancelledError 而不重新抛出。 即使在需要清理资源的 finally 块中,也应该避免长时间操作,因为取消后任务会尽快结束。

总结

gatherTaskGroup,Python 的异步编程正在向更安全、更结构化的方向演进。对于需要同时访问多个外部资源、又要保证资源不泄漏的业务来说,TaskGroup 是极佳的选择。气象聚合这个场景只是冰山一角:微服务间调用、搜索引擎并行查询、数据入库前的批量校验,都能用它把并发控制压缩在几行上下文管理器里。

如果你还在靠着 gather 加上手动 try/except 过日子,不妨翻出最近的异步代码,用 TaskGroup 重写一遍,那种“一组任务同生共死”的爽快感,比多写几个并发数更让人安心。

Python asyncio TaskGroup实战:高并发数据抓取与优雅取消全掌握
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

版权声明:
本站资源有的来自互联网收集整理,本站纯免费分享提供学习使用,如果侵犯了您的合法权益,请联系本站我们会及时删除。
本站资源仅供研究、学习交流之用,免费开源项目不代表完全可商用,若商业用途请先咨询开发企业能否商用,否则产生的一切后果将由下载用户自行承担。
原创板块未经允许不得转载,否则将追究法律责任。

淘吗网 python Python asyncio TaskGroup实战:高并发数据抓取与优雅取消全掌握 https://www.taomawang.com/server/python/2203.html

常见问题

相关文章

猜你喜欢
发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务