在Python的异步编程演进中,asyncio.TaskGroup 自3.11版本正式登场,它以一种更结构化、更安全的方式管理并发任务,同时引入了 ExceptionGroup 和 except* 语法,彻底改变了我们对异步任务异常的认知模式。本文将从零开始,通过对比 asyncio.gather 的局限性,深入剖析 TaskGroup 的核心机制,并最终通过构建一个高效稳定的异步网络爬虫来展示其真正的威力。
一、为什么我们需要 TaskGroup?告别 gather 的混乱
在 TaskGroup 出现之前,开发者主要依赖 asyncio.gather 来同时运行多个协程。这种方式虽然可行,但存在几个明显的缺陷:
- 异常处理混乱:当多个任务同时失败时,
gather只会抛出第一个遇到的异常,其余异常被丢失或需要手动收集。 - 取消行为不直观:一个任务失败后,其他任务可能继续运行,除非显式调用
cancel,容易造成资源浪费。 - 任务生命周期管理困难:无法在语义上明确表示“这些任务属于同一组,应该共同进退”。
TaskGroup 完美解决了这些问题。它通过上下文管理器协议,在退出 async with 块时自动确保所有已创建的任务都已完成,并将多任务异常打包为 ExceptionGroup,允许开发者用 except* 进行精细捕获。
二、TaskGroup 核心用法速览
以下是一个最简单的例子,展示 TaskGroup 的基本语法:
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
return f"任务{name}完成"
async def main():
async with asyncio.TaskGroup() as tg:
# 在组内创建任务
t1 = tg.create_task(task("A", 2))
t2 = tg.create_task(task("B", 1))
# 这里可以继续创建更多任务
# async with 块退出时,所有任务已经完成
print(t1.result()) # 获取任务结果
print(t2.result())
asyncio.run(main())
输出顺序永远是先 B 后 A,因为任务 B 延迟更短。注意:在 async with 块内创建的任务,会在块结束时自动等待,无需手动 await t1 等。这提供了极强的结构性——所有并发工作都被明确地限定在组的作用域内。
2.1 与 gather 在异常处理上的关键区别
假设有两个任务同时抛出异常,gather 只会抛出第一个:
async def fail1():
await asyncio.sleep(0.1)
raise ValueError("错误1")
async def fail2():
await asyncio.sleep(0.2)
raise TypeError("错误2")
async def gather_example():
try:
await asyncio.gather(fail1(), fail2())
except Exception as e:
print(f"捕获到: {type(e).__name__}: {e}")
# 仅输出 ValueError,TypeError 丢失!
而使用 TaskGroup,两个异常会被合并为一个 ExceptionGroup:
async def taskgroup_example():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(fail1())
tg.create_task(fail2())
except* ValueError as ex:
print(f"处理ValueError: {ex!r}")
except* TypeError as ex:
print(f"处理TypeError: {ex!r}")
# 两种异常都会得到处理
这种异常组机制确保了没有异常被静默吞噬,每个异常都可以被针对性地处理。
三、深入解析 except* 与异常组
except* 是 Python 3.11 为配合 ExceptionGroup 引入的新语法。它可以在一个 try 块中匹配异常组内特定类型的异常,并将剩余异常重新组合为新的异常组继续传播(如果需要)。这在任务组场景中非常重要,因为我们可以对待不同类型的任务失败执行不同的恢复策略。
async def robust_job(job_id, fail_type=None):
await asyncio.sleep(0.1)
if fail_type == 'value':
raise ValueError(f"任务{job_id}数值错误")
elif fail_type == 'type':
raise TypeError(f"任务{job_id}类型错误")
return f"任务{job_id}成功"
async def handle_taskgroup_errors():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(robust_job(1))
tg.create_task(robust_job(2, 'value'))
tg.create_task(robust_job(3, 'type'))
except* ValueError as eg:
for err in eg.exceptions:
print(f"记录日志:{err}")
# 这里可以决定是否重新抛出
except* TypeError as eg:
print(f"发送告警:{eg.exceptions}")
raise # 类型错误不可恢复,向上传播
asyncio.run(handle_taskgroup_errors())
在这个例子中,ValueError 被捕获并记录日志,但不会阻止程序继续;而 TypeError 则被重新抛出,表示更严重的故障。这种细粒度控制在过去是极其繁琐的。
四、实战案例:构建一个高并发异步网页爬虫
现在,我们利用 TaskGroup 构建一个完整的异步爬虫,它能够并发抓取多个 URL,动态控制并发数,并对各种网络异常进行分类处理。
4.1 项目需求
- 给定一批 URL,并发发起 HTTP 请求
- 限制最大并发数(通过信号量)
- 对不同类型的异常(超时、连接错误、HTTP错误)分别处理
- 所有任务完成后输出统计报告
- 使用
TaskGroup管理所有抓取任务
4.2 完整代码实现
import asyncio
import aiohttp
import time
from collections import Counter
class AsyncCrawler:
def __init__(self, concurrency=5, timeout=10):
self.concurrency = concurrency
self.timeout = timeout
self.semaphore = asyncio.Semaphore(concurrency)
self.stats = Counter(success=0, timeout=0, connection_error=0, http_error=0, other=0)
async def fetch_url(self, session, url):
"""单个URL抓取协程,包含信号量控制"""
async with self.semaphore:
try:
async with session.get(url, timeout=self.timeout) as response:
if response.status >= 400:
raise aiohttp.ClientResponseError(
status=response.status,
message=f"HTTP {response.status}",
headers=response.headers,
request_info=response.request_info
)
content = await response.text()
self.stats['success'] += 1
return url, len(content), response.status
except asyncio.TimeoutError:
self.stats['timeout'] += 1
print(f"超时:{url}")
raise
except aiohttp.ClientConnectionError:
self.stats['connection_error'] += 1
print(f"连接错误:{url}")
raise
except aiohttp.ClientResponseError as e:
self.stats['http_error'] += 1
print(f"HTTP错误 {e.status}:{url}")
raise
except Exception:
self.stats['other'] += 1
raise
async def run(self, urls):
"""主执行方法:使用TaskGroup并发抓取所有URL"""
start = time.time()
connector = aiohttp.TCPConnector(limit_per_host=self.concurrency)
async with aiohttp.ClientSession(connector=connector) as session:
try:
async with asyncio.TaskGroup() as tg:
tasks = []
for url in urls:
task = tg.create_task(self.fetch_url(session, url))
tasks.append(task)
except* asyncio.TimeoutError as eg:
print(f"共 {len(eg.exceptions)} 个任务超时")
except* aiohttp.ClientConnectionError as eg:
print(f"共 {len(eg.exceptions)} 个连接失败")
except* aiohttp.ClientResponseError as eg:
for exc in eg.exceptions:
print(f"HTTP错误状态码 {exc.status}")
except* Exception as eg:
print(f"其他异常: {len(eg.exceptions)} 个")
elapsed = time.time() - start
print(f"n抓取完成,耗时 {elapsed:.1f} 秒")
print(f"成功: {self.stats['success']} 个")
print(f"超时: {self.stats['timeout']} 个")
print(f"连接错误: {self.stats['connection_error']} 个")
print(f"HTTP错误: {self.stats['http_error']} 个")
print(f"其他: {self.stats['other']} 个")
# 模拟一批待抓取的URL(包含必然超时或无效的地址以测试异常处理)
test_urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/404",
"https://httpbin.org/status/500",
"https://invalid.domain.example/", # 连接错误
"https://httpbin.org/delay/15", # 会超时(timeout=10)
"https://httpbin.org/delay/3",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
]
async def main():
crawler = AsyncCrawler(concurrency=3, timeout=10)
await crawler.run(test_urls)
if __name__ == "__main__":
asyncio.run(main())
4.3 案例解析
这个爬虫实现展示了 TaskGroup 在生产环境中的几个关键应用:
- 信号量并发控制:通过
asyncio.Semaphore限制同时发出的请求数,避免对目标服务器造成过大压力。 - 结构化异常处理:在
run方法中,使用多个except*分支分别处理不同异常类型,构建完善的错误统计。 - 任务组自动等待:无需手动维护任务列表并逐个
await,代码更加简洁清晰。 - 资源管理:
aiohttp.ClientSession和TCPConnector通过async with管理,确保连接正确关闭。
五、进阶技巧:添加任务超时与优雅取消
在实际爬虫中,可能需要对整个抓取过程设置一个总超时时间。Python 3.11 引入了 asyncio.timeout 上下文管理器,可以包裹 TaskGroup 来实现:
async def run_with_global_timeout(self, urls, global_timeout=30):
try:
async with asyncio.timeout(global_timeout):
await self.run(urls)
except asyncio.TimeoutError:
print(f"全局超时({global_timeout}秒)已触发,强制终止所有任务")
当超时发生时,asyncio.timeout 会自动取消其作用域内所有未完成的任务,并且 TaskGroup 内的任务也会被传递取消信号。结合 except* 可以捕获 CancelledError(尽管通常不需要显式处理取消)。
此外,如果我们想更精细地控制每个任务的超时,可以在 fetch_url 内部使用 asyncio.timeout,但这里我们通过 aiohttp 的超时参数已做了单请求超时。
六、性能对比:TaskGroup vs 传统 gather
为了直观展示 TaskGroup 在异常多发的并发场景下的优势,我们编写一个压力测试:模拟100个任务,其中30%会随机抛出异常,比较两种方案的处理逻辑复杂度与异常覆盖完整性。
async def simulate_tasks_with_failures(n=100, fail_rate=0.3):
import random
async def random_fail(id):
await asyncio.sleep(random.uniform(0.01, 0.05))
if random.random() < fail_rate:
raise ValueError(f"任务{id}故意失败")
return id
# 使用 TaskGroup
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(random_fail(i)) for i in range(n)]
# 异常会自动传播,使用except*可完整捕获
# ...
虽然性能差异并非显著(两者底层任务调度相似),但 TaskGroup 的异常完整性使得错误排查不再依赖于人为的日志记录,所有失败任务都被打包上报。这在高并发数据采集和微服务编排中尤为重要。
七、最佳实践与注意事项
- 始终在
async with块内创建任务:任务的生命周期与组绑定,离开块时确保所有任务结束。 - 避免在组内直接
await任务:创建任务后立即await会失去并发性,应该利用组的自动等待特性。 - 合理使用
except*而非except:在 TaskGroup 的异常处理中使用except*能够接触到所有异常实例。 - 信号量限制并发:对于网络或数据库操作,务必通过信号量或连接池限制并发数,避免资源耗尽。
- 结合
asyncio.timeout实现全局超时控制:防止单个异常任务阻塞整个组。
八、结语
asyncio.TaskGroup 与异常组是 Python 异步编程领域的一次重要进化。它们不仅让并发代码更加结构化、安全,还提供了过去难以实现的细粒度异常管理能力。本文通过一个完整的异步爬虫项目,展示了从任务创建、并发控制到异常统计的全链路实践。将这套模式应用到你的 Web 服务、数据管道或分布式任务调度中,你将发现 Python 异步代码的维护成本显著降低,而健壮性大幅提升。
现在,打开你的 Python 3.11+ 环境,用 TaskGroup 重写一个旧有的 gather 场景,亲身体验这种结构化并发带来的优雅吧。

