Python asyncio 异步并发控制实战:构建高性能异步爬虫的完整指南

2026-06-09 0 131

在现代 Python 开发中,异步编程已经成为处理 I/O 密集型任务(如网络请求、文件读写)的标准范式。asyncio 库提供了事件循环、协程和任务,使我们能够以同步代码的写法实现高并发。然而,当并发数过高时,不仅可能压垮目标服务器,还可能导致本地资源耗尽。因此,并发控制是异步编程中的必备技能。本文将通过构建一个高性能异步爬虫的完整案例,从协程基础、任务创建、Semaphore 限流到异常处理,手把手带你掌握 asyncio 的核心用法和最佳实践。

异步编程的基础概念回顾

在深入代码之前,我们先理清几个关键术语:

协程 (Coroutine)
使用 async def 定义的函数,调用时返回一个协程对象,不会立即执行。
事件循环 (Event Loop)
负责调度和执行协程的循环,不断检查哪些协程可以继续执行(如 I/O 完成)。
任务 (Task)
通过 asyncio.create_task() 将协程包装成任务,交给事件循环调度,可以并发运行。
可等待对象 (Awaitable)
能够被 await 的对象,包括协程、任务和 Future。

异步编程的优势在于,当一个协程在等待 I/O 操作(如网络响应)时,事件循环会切换到其他协程执行,从而避免线程阻塞,提高整体吞吐量。

环境准备与 aiohttp 安装

本案例需要 Python 3.8 及以上版本。我们将使用 aiohttp 作为异步 HTTP 客户端,因为它天然支持 asyncio,比 requests 更高效。安装命令:

                
pip install aiohttp
                
            

如果你的 Python 环境缺少 asyncio,其实它是标准库,无需额外安装。另外,我们需要一个目标网站来爬取,这里以 JSONPlaceholder (https://jsonplaceholder.typicode.com) 提供的示例 API 作为采集目标,该服务允许我们无风险地测试高并发请求。

从普通函数到协程:定义与运行

最简单的协程定义:

                
import asyncio

async def greet(name):
    await asyncio.sleep(0.1)   # 模拟 I/O
    return f"Hello, {name}"

# 运行协程
result = asyncio.run(greet("World"))
print(result)  # Hello, World
                
            

asyncio.run() 是 Python 3.7+ 提供的高级 API,自动创建事件循环并运行协程。在真实项目中,我们通常只调用一次 asyncio.run() 启动主协程。

下面我们逐步封装一个异步 HTTP 请求函数,用于后续的爬虫:

                
import aiohttp

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.json()
                
            

任务调度:使用 asyncio.gather 并发执行

要并发地请求多个 URL,可以将协程包装成任务,然后通过 asyncio.gather 等待所有任务完成。

                
async def main():
    urls = [
        "https://jsonplaceholder.typicode.com/posts/1",
        "https://jsonplaceholder.typicode.com/posts/2",
        "https://jsonplaceholder.typicode.com/posts/3"
    ]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for r in results:
            print(r['title'])

asyncio.run(main())
                
            

这段代码会几乎同时发起3个 HTTP 请求,总耗时接近于最慢的那个请求,大大优于顺序执行。但如果 URL 列表包含上百个链接,全部并发可能会导致目标服务器拒绝服务或本地端口耗尽,因此需要引入限流。

并发限流:Semaphore 控制最大协程数

Semaphore(信号量) 是一种常见的并发控制机制。我们可以创建一个 Semaphore 对象,指定最大并发数量。在协程开始时获取信号量,结束时释放。Python 的 asyncio.Semaphore 提供了异步上下文管理器,使用更简洁。

改写 fetch 函数,加入 Semaphore 控制:

                
async def fetch_limited(session, url, semaphore):
    async with semaphore:  # 进入时获取信号量,退出时释放
        async with session.get(url) as response:
            return await response.json()
                
            

然后在主函数中创建 Semaphore,例如限制最多同时 5 个请求:

                
async def main_limited():
    urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 101)]
    sem = asyncio.Semaphore(5)
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_limited(session, url, sem) for url in urls]
        results = await asyncio.gather(*tasks)
        print(f"获取到 {len(results)} 篇文章")
                
            

这样,即使有 100 个任务,任意时刻最多只有 5 个请求在飞行中,其余协程会在 async with sem 处挂起,直到有信号量可用。这是异步爬虫的经典模式。

完整爬虫案例:批量采集文章标题

现在我们将以上碎片组合成完整的爬虫脚本,包含以下功能:

  • 从 JSONPlaceholder API 批量获取文章数据。
  • 使用 Semaphore 限制并发数为 10。
  • 添加适当的超时设置。
  • 收集并打印结果。
                
import asyncio
import aiohttp

MAX_CONCURRENT = 10
TIMEOUT = aiohttp.ClientTimeout(total=30)

async def fetch_post(session, post_id, sem):
    url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"
    async with sem:
        try:
            async with session.get(url, timeout=TIMEOUT) as resp:
                data = await resp.json()
                return post_id, data['title']
        except Exception as e:
            return post_id, f"Error: {e}"

async def main():
    post_ids = range(1, 101)  # 采集 100 篇文章
    sem = asyncio.Semaphore(MAX_CONCURRENT)

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_post(session, pid, sem) for pid in post_ids]
        results = await asyncio.gather(*tasks)

    # 打印结果
    for post_id, title in sorted(results, key=lambda x: x[0]):
        print(f"[{post_id}] {title}")

if __name__ == "__main__":
    asyncio.run(main())
                
            

执行该脚本,它会并发请求 100 个页面,但由于 Semaphore=10,同一时刻仅有 10 个请求在进行,既保证了效率,又避免了对目标服务的冲击。整个采集过程一般在几秒内完成。

健壮性处理:超时、重试与异常捕获

在实际生产中,网络波动、服务端错误等不可避免。我们需要在异步爬虫中加入重试逻辑。可以编写一个装饰器或直接在函数内部实现指数退避重试。

修改 fetch_post 加入重试:

                
import random

async def fetch_post_with_retry(session, post_id, sem, max_retries=3):
    url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"
    for attempt in range(1, max_retries + 1):
        async with sem:
            try:
                async with session.get(url, timeout=TIMEOUT) as resp:
                    if resp.status == 200:
                        data = await resp.json()
                        return post_id, data['title']
                    else:
                        # 服务端返回错误状态码,可能是临时问题,可以重试
                        raise aiohttp.ClientResponseError(
                            resp.request_info, resp.history, status=resp.status
                        )
            except (asyncio.TimeoutError, aiohttp.ClientError) as e:
                print(f"请求 {post_id} 第 {attempt} 次失败: {e}")
                if attempt == max_retries:
                    return post_id, f"Failed after {max_retries} tries"
                # 等待一段随机时间再重试
                await asyncio.sleep(1 * attempt + random.random())
                
            

同时,建议为 asyncio.gather 设置 return_exceptions=True,防止单个任务异常导致整个批次崩溃:

                
results = await asyncio.gather(*tasks, return_exceptions=True)
                
            

这样,即使某些请求失败,返回的也是 Exception 对象,我们可以统一处理。

性能对比与优化建议

我们可以简单对比一下同步、无限制并发和 Semaphore 控制的性能差异。以采集 20 个页面为例(每个页面假设响应时间约 100ms):

  • 顺序同步:大约 20 × 100ms = 2 秒。
  • 全并发(无 Semaphore):理论上约 100ms,但可能因连接数过多而部分请求超时或拒绝。
  • Semaphore=10:大约 2 批 × 100ms = 200ms,既快又稳定。

在实际项目中,并发数设定需要根据目标服务器的承载能力和本机网络状况调整。一般从 5 到 20 较为常见。此外,可以结合 asyncio.Queue 实现生产者-消费者模式,动态添加任务,进一步优化资源利用。

总结

本文通过一个完整的异步爬虫案例,从协程基础、任务并发、Semaphore 限流到重试机制,系统讲解了 Python asyncio 在 I/O 密集型场景下的实战应用。掌握这些技术后,你可以轻松构建高效率、高可控性的异步程序,无论是爬虫、API 客户端还是后端服务。现在,就将这份代码模板应用到你的项目中,享受异步编程带来的性能红利吧。

Python asyncio 异步并发控制实战:构建高性能异步爬虫的完整指南
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

版权声明:
本站资源有的来自互联网收集整理,本站纯免费分享提供学习使用,如果侵犯了您的合法权益,请联系本站我们会及时删除。
本站资源仅供研究、学习交流之用,免费开源项目不代表完全可商用,若商业用途请先咨询开发企业能否商用,否则产生的一切后果将由下载用户自行承担。
原创板块未经允许不得转载,否则将追究法律责任。

淘吗网 python Python asyncio 异步并发控制实战:构建高性能异步爬虫的完整指南 https://www.taomawang.com/server/python/2118.html

常见问题

相关文章

猜你喜欢
发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务