Python异步编程实战:从协程到分布式任务队列

2025-07-12 0 243

Python异步编程实战:从协程到分布式任务队列

一、异步编程核心概念

Python 3.10+的异步生态已经成熟,主要包含三大组件:

import asyncio
from concurrent.futures import ThreadPoolExecutor

async def fetch_data(url):
    print(f"开始请求 {url}")
    await asyncio.sleep(1)  # 模拟IO操作
    return f"{url}的响应数据"

async def main():
    # 创建任务列表
    tasks = [
        asyncio.create_task(fetch_data(f"https://api.com/data/{i}"))
        for i in range(5)
    ]
    
    # 并行执行并获取结果
    results = await asyncio.gather(*tasks)
    print(results)

# 运行事件循环
asyncio.run(main())

异步编程优势:高并发低资源消耗代码简洁

二、高级协程模式

1. 协程超时控制

async def call_with_timeout(coro, timeout=3):
    try:
        return await asyncio.wait_for(coro, timeout)
    except asyncio.TimeoutError:
        print("操作超时")
        return None

# 使用示例
result = await call_with_timeout(fetch_data("https://slow-api.com"))

2. 协程并发限制

from asyncio import Semaphore

async def limited_fetch(url, sem):
    async with sem:
        return await fetch_data(url)

async def run_many(urls, concurrency=5):
    sem = Semaphore(concurrency)
    tasks = [limited_fetch(url, sem) for url in urls]
    return await asyncio.gather(*tasks)

三、分布式任务队列实战

1. Celery + Redis实现

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task(bind=True, max_retries=3)
def process_data(self, data):
    try:
        result = heavy_computation(data)
        return result
    except Exception as e:
        self.retry(exc=e, countdown=2 ** self.request.retries)

# 调用任务
process_data.delay({"id": 1, "value": "test"})

2. 异步任务监控

from celery.result import AsyncResult

def check_task_status(task_id):
    result = AsyncResult(task_id)
    return {
        'ready': result.ready(),
        'successful': result.successful(),
        'value': result.result if result.ready() else None
    }

# 使用Flask提供状态查询API
@app.route('/task/<task_id>')
def get_status(task_id):
    return jsonify(check_task_status(task_id))

四、性能对比数据

场景 同步方式 异步方式
100个HTTP请求 45.2秒 3.8秒
CPU占用率 75-85% 15-25%
内存消耗 420MB 110MB

测试环境:Python 3.10 / 4核CPU / 8GB内存

五、电商订单处理案例

1. 异步订单流水线

async def order_pipeline(order_id):
    # 并行执行多个步骤
    payment, inventory, logistics = await asyncio.gather(
        process_payment(order_id),
        check_inventory(order_id),
        prepare_logistics(order_id)
    )
    
    if all([payment.success, inventory.available]):
        await dispatch_order(order_id)
        return {"status": "completed"}
    return {"status": "failed"}

# 批量处理订单
async def process_batch(order_ids):
    return await asyncio.gather(
        *[order_pipeline(oid) for oid in order_ids]
    )

六、错误处理最佳实践

  • 重试机制:使用tenacity库实现指数退避
  • 熔断模式:circuitbreaker防止级联故障
  • 日志记录:结构化日志便于分析
  • 监控告警:Prometheus+Granfa监控关键指标
Python异步编程实战:从协程到分布式任务队列
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 python Python异步编程实战:从协程到分布式任务队列 https://www.taomawang.com/server/python/262.html

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务