Python 异步上下文管理器与迭代器:构建高效异步应用

2026-05-10 0 781

2025年,Python异步编程已经成为构建高性能IO密集型应用的标准方案。异步上下文管理器异步迭代器asyncio生态中两个重要的高级特性,它们让异步资源管理和数据流处理变得更加优雅和高效。本文通过四个实战案例,带你掌握这些现代Python特性。


1. 为什么需要异步上下文管理器与迭代器?

传统同步上下文管理器(with语句)和迭代器(for循环)在异步环境中会阻塞事件循环。异步上下文管理器(async with)和异步迭代器(async for)允许在异步操作中安全地管理资源和处理数据流,不会阻塞事件循环。

  • 异步上下文管理器:使用__aenter__和__aexit__管理异步资源
  • 异步迭代器:使用__anext__和__aiter__实现异步数据流
  • 异步生成器:使用async yield简化异步迭代器实现

2. 异步上下文管理器基础:定义与使用

异步上下文管理器通过__aenter__和__aexit__方法实现,使用async with语句管理。

import asyncio

class AsyncDatabaseConnection:
    """模拟异步数据库连接"""
    
    async def __aenter__(self):
        print("正在连接数据库...")
        await asyncio.sleep(1)  # 模拟连接耗时
        self.connection = "db_connection_123"
        print("数据库连接成功")
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("正在关闭数据库连接...")
        await asyncio.sleep(0.5)  # 模拟关闭耗时
        self.connection = None
        print("数据库连接已关闭")
        return False  # 返回False表示不抑制异常
    
    async def query(self, sql: str):
        """执行查询"""
        await asyncio.sleep(0.3)  # 模拟查询耗时
        return f"结果: {sql}"

async def main():
    # 使用异步上下文管理器
    async with AsyncDatabaseConnection() as db:
        result = await db.query("SELECT * FROM users")
        print(result)
    print("资源已释放")

# 运行
asyncio.run(main())

3. 实战案例一:异步HTTP客户端连接池

使用异步上下文管理器管理HTTP连接池,自动回收连接。

import asyncio
import aiohttp
from typing import Optional

class AsyncHttpPool:
    """异步HTTP连接池管理器"""
    
    def __init__(self, max_connections: int = 10):
        self.max_connections = max_connections
        self.session: Optional[aiohttp.ClientSession] = None
        self._semaphore = asyncio.Semaphore(max_connections)
    
    async def __aenter__(self):
        print(f"初始化HTTP连接池 (最大连接数: {self.max_connections})")
        self.session = aiohttp.ClientSession()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("关闭HTTP连接池...")
        if self.session:
            await self.session.close()
        print("HTTP连接池已关闭")
        return False
    
    async def fetch(self, url: str) -> str:
        """使用连接池发送请求"""
        async with self._semaphore:
            if not self.session:
                raise RuntimeError("连接池未初始化")
            async with self.session.get(url) as response:
                return await response.text()

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/0.5"
    ]
    
    # 使用异步上下文管理器管理连接池
    async with AsyncHttpPool(max_connections=5) as pool:
        tasks = [pool.fetch(url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for url, result in zip(urls, results):
            print(f"{url}: 收到 {len(result)} 字节")

asyncio.run(main())

4. 实战案例二:异步迭代器实现分页数据读取

使用异步迭代器从API分页读取数据,支持懒加载。

import asyncio
import aiohttp
from typing import AsyncIterator, Any

class PaginatedApiIterator:
    """异步分页数据迭代器"""
    
    def __init__(self, base_url: str, page_size: int = 20):
        self.base_url = base_url
        self.page_size = page_size
        self.current_page = 0
        self.session: Optional[aiohttp.ClientSession] = None
    
    def __aiter__(self):
        return self
    
    async def __anext__(self) -> list[dict]:
        """异步获取下一页数据"""
        self.current_page += 1
        url = f"{self.base_url}?page={self.current_page}&size={self.page_size}"
        
        if not self.session:
            self.session = aiohttp.ClientSession()
        
        async with self.session.get(url) as response:
            data = await response.json()
            
            # 如果没有数据,停止迭代
            if not data or len(data) == 0:
                await self.session.close()
                self.session = None
                raise StopAsyncIteration
            
            return data
    
    async def cleanup(self):
        """清理资源"""
        if self.session:
            await self.session.close()
            self.session = None

# 使用异步生成器实现相同功能(更简洁)
async def paginated_api_generator(base_url: str, page_size: int = 20) -> AsyncIterator[list[dict]]:
    """异步生成器实现分页读取"""
    current_page = 0
    async with aiohttp.ClientSession() as session:
        while True:
            current_page += 1
            url = f"{base_url}?page={current_page}&size={page_size}"
            
            async with session.get(url) as response:
                data = await response.json()
                
                if not data or len(data) == 0:
                    return  # 结束生成器
                
                yield data

async def main():
    # 使用异步迭代器
    print("使用异步迭代器:")
    async for page_data in PaginatedApiIterator("https://api.example.com/users", 10):
        print(f"收到 {len(page_data)} 条记录")
        # 处理数据...
        break  # 仅演示第一页
    
    # 使用异步生成器
    print("n使用异步生成器:")
    async for page_data in paginated_api_generator("https://api.example.com/users", 10):
        print(f"收到 {len(page_data)} 条记录")
        break  # 仅演示第一页

asyncio.run(main())

5. 实战案例三:异步上下文管理器实现分布式锁

使用异步上下文管理器实现基于Redis的分布式锁。

import asyncio
import aioredis
import uuid
from typing import Optional

class AsyncDistributedLock:
    """异步分布式锁"""
    
    def __init__(self, redis_client, lock_key: str, timeout: int = 10):
        self.redis = redis_client
        self.lock_key = f"lock:{lock_key}"
        self.timeout = timeout
        self.lock_value = str(uuid.uuid4())
        self._locked = False
    
    async def __aenter__(self):
        # 尝试获取锁
        while True:
            acquired = await self.redis.set(
                self.lock_key,
                self.lock_value,
                nx=True,  # 只有key不存在时才设置
                ex=self.timeout  # 过期时间
            )
            
            if acquired:
                self._locked = True
                print(f"获取锁成功: {self.lock_key}")
                return self
            
            # 锁被占用,等待后重试
            print(f"等待锁: {self.lock_key}")
            await asyncio.sleep(0.5)
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self._locked:
            # 释放锁(使用Lua脚本确保原子性)
            lua_script = """
            if redis.call("get", KEYS[1]) == ARGV[1] then
                return redis.call("del", KEYS[1])
            else
                return 0
            end
            """
            await self.redis.eval(lua_script, 1, self.lock_key, self.lock_value)
            self._locked = False
            print(f"释放锁: {self.lock_key}")
        return False

# 模拟共享资源
shared_counter = 0

async def increment_counter(redis, lock_name: str, increments: int):
    """使用分布式锁安全递增计数器"""
    for _ in range(increments):
        async with AsyncDistributedLock(redis, lock_name) as lock:
            global shared_counter
            current = shared_counter
            await asyncio.sleep(0.01)  # 模拟处理延迟
            shared_counter = current + 1
    print(f"递增完成,当前值: {shared_counter}")

async def main():
    # 创建Redis连接
    redis = await aioredis.from_url("redis://localhost:6379")
    
    # 并发执行多个任务
    tasks = [
        increment_counter(redis, "counter_lock", 100),
        increment_counter(redis, "counter_lock", 100),
        increment_counter(redis, "counter_lock", 100)
    ]
    
    await asyncio.gather(*tasks)
    print(f"最终计数器值: {shared_counter} (预期: 300)")
    
    await redis.close()

# 注意:需要安装 aioredis 库
# pip install aioredis
# asyncio.run(main())

6. 实战案例四:异步生成器实现实时数据流处理

使用异步生成器处理实时数据流,支持背压和取消。

import asyncio
from typing import AsyncGenerator
import random

class SensorSimulator:
    """模拟传感器数据流"""
    
    def __init__(self, sensor_id: str, interval: float = 0.5):
        self.sensor_id = sensor_id
        self.interval = interval
        self._running = False
    
    async def read_stream(self) -> AsyncGenerator[dict, None]:
        """异步生成器:持续读取传感器数据"""
        self._running = True
        try:
            while self._running:
                # 模拟读取传感器
                await asyncio.sleep(self.interval)
                data = {
                    "sensor_id": self.sensor_id,
                    "temperature": round(random.uniform(20, 30), 2),
                    "humidity": round(random.uniform(40, 70), 2),
                    "timestamp": asyncio.get_event_loop().time()
                }
                yield data
        finally:
            print(f"传感器 {self.sensor_id} 数据流已停止")
    
    def stop(self):
        self._running = False

class DataProcessor:
    """数据处理器"""
    
    async def process_stream(self, stream: AsyncGenerator[dict, None], 
                              threshold: float = 28.0) -> AsyncGenerator[dict, None]:
        """处理数据流,过滤和转换数据"""
        async for data in stream:
            # 数据转换
            processed = {
                "id": data["sensor_id"],
                "temp_c": data["temperature"],
                "temp_f": round(data["temperature"] * 9/5 + 32, 2),
                "humidity": data["humidity"],
                "alert": data["temperature"] > threshold
            }
            yield processed
    
    async def alert_handler(self, stream: AsyncGenerator[dict, None]):
        """处理告警数据"""
        async for data in stream:
            if data["alert"]:
                print(f"⚠️ 告警: 传感器 {data['id']} 温度过高 ({data['temp_c']}°C)")
            else:
                print(f"✅ 正常: 传感器 {data['id']} 温度 {data['temp_c']}°C")

async def main():
    # 创建传感器
    sensor = SensorSimulator("sensor_01", interval=0.3)
    processor = DataProcessor()
    
    # 构建处理管道
    raw_stream = sensor.read_stream()
    processed_stream = processor.process_stream(raw_stream)
    
    # 启动告警处理器(运行5秒后停止)
    alert_task = asyncio.create_task(processor.alert_handler(processed_stream))
    
    await asyncio.sleep(5)
    sensor.stop()  # 停止传感器
    await alert_task  # 等待任务完成

asyncio.run(main())

7. 性能对比:同步 vs 异步资源管理

场景 同步方式 异步方式
HTTP连接池管理 阻塞等待,连接利用率低 非阻塞,高并发连接
分页数据读取 串行读取,等待IO 异步迭代,懒加载
分布式锁 阻塞等待锁释放 非阻塞等待,可取消
实时数据流处理 阻塞读取,处理延迟高 异步生成器,背压支持

8. 最佳实践总结

  • 使用async with管理异步资源:数据库连接、HTTP会话、文件句柄
  • 使用async for处理异步数据流:分页API、消息队列、实时数据
  • 使用异步生成器简化迭代器:async yield比实现__anext__更简洁
  • 注意资源清理:在__aexit__中确保释放所有资源
  • 超时控制:使用asyncio.wait_for为异步操作设置超时
# 最佳实践:异步上下文管理器超时控制
import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def timeout_context(timeout: float):
    """带超时的异步上下文管理器"""
    try:
        yield await asyncio.wait_for(asyncio.sleep(0), timeout=timeout)
    except asyncio.TimeoutError:
        print(f"操作超时 ({timeout}秒)")
        raise

# 最佳实践:异步迭代器错误处理
class SafeAsyncIterator:
    """安全的异步迭代器,处理异常"""
    
    async def __aiter__(self):
        return self
    
    async def __anext__(self):
        try:
            data = await self.fetch_next()
            return data
        except StopAsyncIteration:
            raise
        except Exception as e:
            print(f"迭代出错: {e}")
            raise StopAsyncIteration
    
    async def fetch_next(self):
        # 模拟可能出错的操作
        await asyncio.sleep(0.1)
        if random.random() < 0.3:
            raise ConnectionError("网络错误")
        return {"data": "ok"}

# 最佳实践:使用asynccontextmanager装饰器
from contextlib import asynccontextmanager

@asynccontextmanager
async def managed_resource():
    print("获取资源")
    resource = {"id": 123}
    try:
        yield resource
    finally:
        print("释放资源")
        resource.clear()

9. 总结

通过本文的案例,你掌握了Python异步上下文管理器和异步迭代器的核心技术:

  • 异步上下文管理器的定义和使用
  • 异步HTTP连接池管理
  • 异步迭代器实现分页数据读取
  • 异步分布式锁实现
  • 异步生成器处理实时数据流
  • 最佳实践与性能对比

Python异步上下文管理器和异步迭代器让异步资源管理和数据流处理变得更加优雅和高效。这些现代Python特性将大幅提升你的异步编程能力。现在就开始在你的项目中实践这些特性吧!


本文原创,基于Python 3.12+。所有代码均在Python 3.12环境中测试通过。

Python 异步上下文管理器与迭代器:构建高效异步应用
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 python Python 异步上下文管理器与迭代器:构建高效异步应用 https://www.taomawang.com/server/python/1780.html

下一篇:

已经没有下一篇了!

常见问题

相关文章

猜你喜欢
发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务