Python异步编程新范式:协程与任务调度深度实战

2025-07-13 0 399

Python异步编程新范式:协程与任务调度深度实战

一、协程核心机制

突破传统同步编程的性能瓶颈:

import asyncio
from datetime import datetime

async def task(name, delay):
    print(f"[{datetime.now()}] Task {name} started")
    await asyncio.sleep(delay)
    print(f"[{datetime.now()}] Task {name} finished")
    return f"Result-{name}"

async def main():
    # 创建任务列表
    tasks = [
        asyncio.create_task(task("A", 2)),
        asyncio.create_task(task("B", 1)),
        asyncio.create_task(task("C", 3))
    ]
    
    # 等待首个完成的任务
    done, pending = await asyncio.wait(
        tasks, 
        return_when=asyncio.FIRST_COMPLETED
    )
    
    # 处理已完成任务
    for completed_task in done:
        print(f"First completed: {completed_task.result()}")

    # 取消剩余任务
    for task in pending:
        task.cancel()

# 运行事件循环
asyncio.run(main())

核心优势:非阻塞IO高并发低资源消耗代码简洁

二、高级协程模式

1. 协程通信与同步

async def producer(queue, item):
    await queue.put(item)
    print(f"Produced: {item}")

async def consumer(queue):
    while True:
        item = await queue.get()
        print(f"Consumed: {item}")
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=3)
    
    # 启动生产者和消费者
    producers = [asyncio.create_task(producer(queue, i)) 
                for i in range(5)]
    consumers = [asyncio.create_task(consumer(queue)) 
                for _ in range(2)]
    
    await asyncio.gather(*producers)
    await queue.join()  # 等待队列处理完成
    
    for c in consumers:
        c.cancel()

# 信号量控制并发
async def limited_task(sem, url):
    async with sem:
        return await fetch(url)

sem = asyncio.Semaphore(10)
tasks = [limited_task(sem, url) for url in urls]
await asyncio.gather(*tasks)

2. 协程超时与重试

async def fetch_with_retry(url, retries=3, timeout=5):
    for attempt in range(retries):
        try:
            async with asyncio.timeout(timeout):
                return await fetch(url)
        except Exception as e:
            print(f"Attempt {attempt+1} failed: {str(e)}")
            if attempt == retries - 1:
                raise
            await asyncio.sleep(1 * (attempt + 1))
    return None

# 任务分组执行
async def batch_process(items, batch_size=10):
    for i in range(0, len(items), batch_size):
        batch = items[i:i+batch_size]
        tasks = [process_item(item) for item in batch]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        for result in results:
            if isinstance(result, Exception):
                print(f"Error processing: {result}")
            else:
                yield result

三、性能优化实战

1. 异步数据库访问

import asyncpg
from asyncpg.pool import Pool

async def init_pool() -> Pool:
    return await asyncpg.create_pool(
        min_size=5,
        max_size=20,
        command_timeout=60,
        host='localhost',
        user='postgres'
    )

async def fetch_users(pool: Pool):
    async with pool.acquire() as conn:
        return await conn.fetch(
            "SELECT * FROM users WHERE active = $1", True
        )

# 事务处理示例
async def transfer_funds(pool, from_id, to_id, amount):
    async with pool.acquire() as conn:
        async with conn.transaction():
            await conn.execute(
                "UPDATE accounts SET balance = balance - $1 WHERE id = $2",
                amount, from_id
            )
            await conn.execute(
                "UPDATE accounts SET balance = balance + $1 WHERE id = $2",
                amount, to_id
            )

2. 高性能HTTP客户端

import aiohttp
from aiohttp import ClientSession

async def fetch_all(urls):
    async with ClientSession() as session:
        tasks = []
        for url in urls:
            task = asyncio.create_task(
                fetch_url(session, url)
            )
            tasks.append(task)
        return await asyncio.gather(*tasks)

async def fetch_url(session: ClientSession, url):
    try:
        async with session.get(url, timeout=10) as response:
            if response.status == 200:
                return await response.json()
            return None
    except Exception as e:
        print(f"Error fetching {url}: {str(e)}")
        return None

# 限速请求
async def rate_limited_fetch(sem, session, url):
    async with sem:
        await asyncio.sleep(0.5)  # 延迟500ms
        return await fetch_url(session, url)

四、电商系统实战案例

1. 异步订单处理系统

class OrderProcessor:
    def __init__(self):
        self.order_queue = asyncio.Queue()
        self.is_running = False

    async def process_order(self, order):
        # 模拟订单处理
        await asyncio.sleep(0.5)
        print(f"Processed order {order['id']}")
        return {"status": "completed"}

    async def worker(self):
        while self.is_running:
            order = await self.order_queue.get()
            try:
                result = await self.process_order(order)
                await self.save_result(order['id'], result)
            except Exception as e:
                print(f"Failed to process order {order['id']}: {str(e)}")
            finally:
                self.order_queue.task_done()

    async def start(self, worker_count=5):
        self.is_running = True
        self.workers = [asyncio.create_task(self.worker()) 
                       for _ in range(worker_count)]

    async def stop(self):
        self.is_running = False
        await self.order_queue.join()
        for worker in self.workers:
            worker.cancel()

# 使用示例
async def main():
    processor = OrderProcessor()
    await processor.start()
    
    # 模拟订单添加
    for i in range(100):
        await processor.order_queue.put({"id": i+1})
    
    await asyncio.sleep(5)
    await processor.stop()

五、生产环境最佳实践

  • 错误处理:为每个协程添加异常捕获
  • 资源限制:使用信号量控制最大并发
  • 性能监控:记录关键协程执行时间
  • 调试技巧:使用asyncio调试模式
  • 测试策略:使用pytest-asyncio进行测试
Python异步编程新范式:协程与任务调度深度实战
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 python Python异步编程新范式:协程与任务调度深度实战 https://www.taomawang.com/server/python/310.html

下一篇:

已经没有下一篇了!

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务