在Python异步编程中,async with和async for已经成为管理资源与处理数据流的标准方式。但很多开发者只停留在使用aiohttp或aiomysql的层面,并未理解如何自己实现异步上下文管理器。本文通过构建一个异步数据库连接池,从零实现__aenter__、__aexit__以及__aiter__、__anext__,让你彻底掌握这一核心机制。
一、为什么需要异步上下文管理器?
传统的with语句用于管理同步资源(如文件、锁),而异步上下文管理器(async with)用于管理需要等待I/O操作的资源,比如数据库连接、HTTP会话。它确保在进入和退出时执行异步操作,比如获取连接和归还连接。
同样,异步迭代器(async for)用于消费异步生成的数据序列,比如数据库游标、分页查询结果。
二、项目目标:实现一个异步数据库连接池
我们将模拟一个PostgreSQL连接池(使用asyncpg风格),但为了演示核心原理,我们使用asyncio.sleep模拟网络延迟。连接池需要支持:
- 限制最大连接数
- 获取连接时如果池为空则等待(使用asyncio.Condition)
- 归还连接后通知等待者
- 支持
async with自动获取和归还 - 支持
async for遍历查询结果(模拟游标)
三、完整代码实现
以下代码是完整的Python脚本,可直接运行(需要Python 3.7+)。所有逻辑均包含在注释中。
import asyncio
import random
from typing import Optional, List
class AsyncDatabaseConnection:
"""模拟一个数据库连接"""
def __init__(self, conn_id: int):
self.conn_id = conn_id
self.in_use = False
async def execute(self, query: str) -> str:
"""模拟执行查询,返回模拟结果"""
await asyncio.sleep(0.1) # 模拟网络延迟
return f"结果来自连接{self.conn_id}: {query}"
async def close(self):
"""模拟关闭连接"""
await asyncio.sleep(0.05)
print(f"连接 {self.conn_id} 已关闭")
class AsyncConnectionPool:
"""异步连接池,支持 async with 和 async for"""
def __init__(self, min_size: int = 2, max_size: int = 5):
self.min_size = min_size
self.max_size = max_size
self._pool: List[AsyncDatabaseConnection] = []
self._used: int = 0
self._condition = asyncio.Condition()
self._closed = False
async def _create_connection(self) -> AsyncDatabaseConnection:
"""创建新连接"""
conn_id = len(self._pool) + 1
conn = AsyncDatabaseConnection(conn_id)
self._pool.append(conn)
print(f"创建连接 {conn_id}")
return conn
async def acquire(self) -> AsyncDatabaseConnection:
"""获取连接,如果达到上限则等待"""
async with self._condition:
while True:
if self._closed:
raise RuntimeError("连接池已关闭")
# 尝试从池中获取空闲连接
for conn in self._pool:
if not conn.in_use:
conn.in_use = True
self._used += 1
return conn
# 如果没有空闲连接且未达到上限,创建新连接
if len(self._pool) 'AsyncConnectionPool':
# 初始化时创建最小连接数
for _ in range(self.min_size):
await self._create_connection()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
# 实现异步迭代器(用于遍历查询结果)
def __aiter__(self):
self._iter_index = 0
self._iter_data = [f"行{i}" for i in range(5)] # 模拟5行结果
return self
async def __anext__(self) -> str:
if self._iter_index >= len(self._iter_data):
raise StopAsyncIteration
await asyncio.sleep(0.1) # 模拟异步获取下一行
value = self._iter_data[self._iter_index]
self._iter_index += 1
return value
# 自定义异步上下文管理器:数据库连接代理
class AsyncConnectionProxy:
"""使用 async with 自动获取/归还连接"""
def __init__(self, pool: AsyncConnectionPool):
self.pool = pool
self.conn: Optional[AsyncDatabaseConnection] = None
async def __aenter__(self) -> AsyncDatabaseConnection:
self.conn = await self.pool.acquire()
print(f"获取连接 {self.conn.conn_id}")
return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.conn:
await self.pool.release(self.conn)
print(f"归还连接 {self.conn.conn_id}")
# 使用示例
async def main():
# 使用 async with 管理连接池生命周期
async with AsyncConnectionPool(min_size=2, max_size=4) as pool:
print("连接池已初始化")
# 使用连接代理执行多个查询
async with AsyncConnectionProxy(pool) as conn1:
result1 = await conn1.execute("SELECT * FROM users")
print(result1)
async with AsyncConnectionProxy(pool) as conn2:
result2 = await conn2.execute("SELECT * FROM orders")
print(result2)
# 并发获取多个连接(测试等待机制)
async def worker(task_id: int):
async with AsyncConnectionProxy(pool) as conn:
result = await conn.execute(f"任务{task_id}")
print(f"任务{task_id}: {result}")
# 同时启动6个任务,超过最大连接数4,会触发等待
tasks = [worker(i) for i in range(6)]
await asyncio.gather(*tasks)
# 演示异步迭代器
print("n遍历查询结果:")
async for row in pool:
print(f" 获取到: {row}")
print("程序结束")
if __name__ == "__main__":
asyncio.run(main())
四、核心机制详解
1. 异步上下文管理器(async with)
AsyncConnectionPool实现了__aenter__和__aexit__,使得我们可以使用async with AsyncConnectionPool() as pool来管理连接池生命周期。在进入时创建最小连接数,在退出时关闭所有连接。
AsyncConnectionProxy则实现了更细粒度的连接获取/归还:进入时调用pool.acquire(),退出时调用pool.release()。这种模式完美封装了资源的获取与释放,避免忘记归还连接。
2. 异步迭代器(async for)
__aiter__返回迭代器对象,__anext__返回一个awaitable对象,当没有更多数据时抛出StopAsyncIteration。在案例中,我们模拟了数据库游标逐行返回结果。实际项目中,可以结合asyncpg的游标实现真正的异步逐行读取。
3. 连接等待机制
使用asyncio.Condition实现生产者-消费者模式。当连接池满时,acquire方法会await condition.wait(),直到有连接被归还并调用condition.notify()。这避免了轮询,效率极高。
4. 并发测试
在main()中,我们启动了6个并发任务,但连接池最大只有4个连接。运行代码会看到前4个任务立即获取连接,后2个任务等待,直到有连接被归还。这就是连接池的核心价值。
五、运行与测试
将上述代码保存为async_pool.py,在终端运行:
$ python async_pool.py
你将看到类似输出:
创建连接 1
创建连接 2
连接池已初始化
获取连接 1
结果来自连接1: SELECT * FROM users
归还连接 1
获取连接 2
结果来自连接2: SELECT * FROM orders
归还连接 2
获取连接 1
...
任务5: 结果来自连接4: 任务5
遍历查询结果:
获取到: 行0
获取到: 行1
获取到: 行2
获取到: 行3
获取到: 行4
连接 1 已关闭
连接 2 已关闭
连接 3 已关闭
连接 4 已关闭
程序结束
注意观察连接ID的复用以及任务等待现象。
六、扩展:集成真实数据库
将AsyncDatabaseConnection替换为asyncpg的连接,即可成为真正的生产级连接池。核心结构不变:
import asyncpg
class RealConnectionPool:
def __init__(self, dsn, min_size=2, max_size=5):
self.dsn = dsn
self.min_size = min_size
self.max_size = max_size
self._pool = None
async def __aenter__(self):
self._pool = await asyncpg.create_pool(
dsn=self.dsn,
min_size=self.min_size,
max_size=self.max_size
)
return self
async def __aexit__(self, *args):
await self._pool.close()
async def acquire(self):
return await self._pool.acquire()
async def release(self, conn):
await self._pool.release(conn)
这样,你就拥有了一个支持async with的、真正的异步数据库连接池。
七、常见陷阱与最佳实践
- 忘记调用
release:使用async with代理模式可以避免这个问题。 - 在
__aexit__中处理异常:如果发生异常,__aexit__的参数exc_type等会包含异常信息,可以选择记录日志或重新抛出。 - 异步迭代器中的
StopAsyncIteration:必须抛出此异常来结束迭代,而不是返回None。 - 连接池关闭后的操作:在
__aexit__中设置_closed标志,并在acquire中检查,防止在关闭后继续获取连接。
八、总结
通过构建异步数据库连接池,我们深入实践了Python异步上下文管理器和异步迭代器。这不仅是语法层面的掌握,更是对资源管理、并发控制、异步设计模式的理解。当你下次使用async with aiohttp.ClientSession()时,你会明白背后发生了什么——这正是高级开发者的必备素养。
你可以将本案例的模式应用到缓存连接池、消息队列连接池等任何需要异步资源管理的场景。异步编程的威力,在于对资源的精细控制与高效利用。
本文为原创技术教程,代码基于Python 3.10测试通过。欢迎在实际项目中重构与优化。

