Python异步编程完全指南:从协程到高性能应用 | Python高级编程教程

2025-08-24 0 266

深入掌握Python asyncio库,构建高性能异步应用程序

异步编程简介

异步编程是现代Python开发中至关重要的技能,特别是在I/O密集型应用场景中。通过asyncio库,Python提供了原生的异步编程支持,允许开发者编写高性能的非阻塞代码。

与多线程和多进程不同,异步编程使用单线程事件循环处理多个任务,通过协程的挂起和恢复实现并发,避免了线程切换的开销和复杂性。

协程基础

协程是异步编程的核心构建块,是可以暂停和恢复执行的函数。

定义协程

import asyncio

# 使用async def定义协程
async def simple_coroutine():
    print("Hello")
    await asyncio.sleep(1)  # 模拟I/O操作
    print("World")

# 运行协程
asyncio.run(simple_coroutine())

await表达式

async def fetch_data():
    print("开始获取数据")
    await asyncio.sleep(2)  # 模拟网络请求
    print("数据获取完成")
    return {"data": 123}

async def main():
    result = await fetch_data()  # 等待协程完成
    print(f"获取到的数据: {result}")

asyncio.run(main())

并发运行多个协程

async def task(name, delay):
    print(f"任务 {name} 开始")
    await asyncio.sleep(delay)
    print(f"任务 {name} 完成")
    return f"{name}-结果"

async def main():
    # 并发运行多个任务
    results = await asyncio.gather(
        task("A", 2),
        task("B", 1),
        task("C", 3)
    )
    print(f"所有任务完成: {results}")

asyncio.run(main())

事件循环详解

事件循环是asyncio的核心,负责调度和执行协程任务。

获取和管理事件循环

async def example_loop():
    loop = asyncio.get_running_loop()  # 获取当前事件循环
    print(f"事件循环: {loop}")
    
    # 在事件循环中调度任务
    result = await loop.run_in_executor(None, lambda: "CPU密集型任务")
    print(f"执行器结果: {result}")

asyncio.run(example_loop())

自定义事件循环

async def custom_loop_example():
    # 创建自定义策略(高级用法)
    try:
        # 在Unix系统上使用UVLoop可以提高性能
        import uvloop
        asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    except ImportError:
        print("UVLoop不可用,使用默认事件循环")
    
    # 正常使用asyncio
    await asyncio.sleep(1)
    print("使用自定义事件循环策略")

asyncio.run(custom_loop_example())

任务与Future对象

Task是Future的子类,用于在事件循环中调度协程的执行。

创建和管理任务

async def long_running_task(seconds):
    print(f"任务开始,需要 {seconds} 秒")
    await asyncio.sleep(seconds)
    print("任务完成")
    return seconds

async def main():
    # 创建任务但不立即等待
    task1 = asyncio.create_task(long_running_task(2))
    task2 = asyncio.create_task(long_running_task(3))
    
    # 执行其他操作
    print("任务已创建,执行其他工作...")
    await asyncio.sleep(1)
    print("其他工作完成")
    
    # 等待任务完成
    results = await asyncio.gather(task1, task2)
    print(f"任务结果: {results}")

asyncio.run(main())

使用Future对象

async def set_future_result(future, delay, value):
    await asyncio.sleep(delay)
    future.set_result(value)

async def future_example():
    loop = asyncio.get_event_loop()
    future = loop.create_future()
    
    # 安排一个任务来设置future的结果
    asyncio.create_task(set_future_result(future, 2, "未来结果"))
    
    print("等待future完成...")
    result = await future  # 等待future被设置结果
    print(f"Future结果: {result}")

asyncio.run(future_example())

超时处理

async def might_timeout(delay):
    await asyncio.sleep(delay)
    return "成功完成"

async def main():
    try:
        # 设置3秒超时
        result = await asyncio.wait_for(might_timeout(5), timeout=3)
        print(f"结果: {result}")
    except asyncio.TimeoutError:
        print("任务超时")

asyncio.run(main())

异步I/O操作

asyncio提供了多种异步I/O操作,适用于网络通信和文件操作。

异步网络请求

import aiohttp
import asyncio

async def fetch_url(session, url):
    try:
        async with session.get(url) as response:
            return await response.text()
    except Exception as e:
        return f"错误: {e}"

async def main():
    urls = [
        "https://httpbin.org/json",
        "https://httpbin.org/xml",
        "https://httpbin.org/html"
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for url, content in zip(urls, results):
            print(f"{url}: 获取到 {len(content)} 字符")

asyncio.run(main())

异步文件操作

import aiofiles
import asyncio

async def async_file_operations():
    # 异步写入文件
    async with aiofiles.open('test.txt', 'w') as f:
        await f.write('Hello, Async World!')
    
    # 异步读取文件
    async with aiofiles.open('test.txt', 'r') as f:
        content = await f.read()
        print(f"文件内容: {content}")

asyncio.run(async_file_operations())

异步数据库操作

import asyncpg
import asyncio

async def async_database_example():
    try:
        # 连接数据库
        conn = await asyncpg.connect(
            user='user', password='password',
            database='test', host='localhost'
        )
        
        # 执行查询
        result = await conn.fetch('SELECT * FROM users WHERE active = $1', True)
        print(f"找到 {len(result)} 个活跃用户")
        
        # 关闭连接
        await conn.close()
    except Exception as e:
        print(f"数据库错误: {e}")

# asyncio.run(async_database_example())

异步同步原语

asyncio提供了多种同步原语,用于协调异步任务。

异步锁

import asyncio

class AsyncCounter:
    def __init__(self):
        self.value = 0
        self.lock = asyncio.Lock()

    async def increment(self):
        async with self.lock:  # 获取锁
            current = self.value
            await asyncio.sleep(0.1)  # 模拟I/O操作
            self.value = current + 1

async def main():
    counter = AsyncCounter()
    
    # 创建多个任务同时增加计数器
    tasks = [asyncio.create_task(counter.increment()) for _ in range(10)]
    await asyncio.gather(*tasks)
    
    print(f"最终计数器值: {counter.value}")  # 应该是10

asyncio.run(main())

信号量

async def limited_resource(semaphore, name):
    async with semaphore:
        print(f"{name} 获取资源")
        await asyncio.sleep(2)  # 模拟资源使用
        print(f"{name} 释放资源")

async def main():
    semaphore = asyncio.Semaphore(3)  # 允许3个并发访问
    
    tasks = []
    for i in range(10):
        tasks.append(limited_resource(semaphore, f"任务-{i}"))
    
    await asyncio.gather(*tasks)

asyncio.run(main())

事件和条件变量

async def waiter(event, name):
    print(f"{name} 等待事件")
    await event.wait()
    print(f"{name} 检测到事件")

async def setter(event):
    await asyncio.sleep(2)
    print("设置事件")
    event.set()

async def main():
    event = asyncio.Event()
    
    await asyncio.gather(
        waiter(event, "A"),
        waiter(event, "B"),
        setter(event)
    )

asyncio.run(main())

实战案例:异步Web爬虫

下面通过一个完整的异步Web爬虫示例展示asyncio的实际应用:

import aiohttp
import asyncio
from urllib.parse import urljoin, urlparse
import time

class AsyncWebCrawler:
    def __init__(self, max_concurrent=10):
        self.visited = set()
        self.to_crawl = asyncio.Queue()
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session = None
    
    async def fetch(self, url):
        async with self.semaphore:
            try:
                async with self.session.get(url, timeout=5) as response:
                    if response.status == 200:
                        return await response.text()
                    return None
            except Exception as e:
                print(f"获取 {url} 时出错: {e}")
                return None
    
    def extract_links(self, html, base_url):
        # 简化的链接提取
        import re
        links = re.findall(r'href="([^" rel="external nofollow" ]+)"', html)
        full_links = set()
        for link in links:
            full_link = urljoin(base_url, link)
            if urlparse(full_link).netloc == urlparse(base_url).netloc:
                full_links.add(full_link)
        return full_links
    
    async def worker(self, name):
        while True:
            try:
                url = await self.to_crawl.get()
                
                if url in self.visited:
                    self.to_crawl.task_done()
                    continue
                
                self.visited.add(url)
                print(f"{name} 爬取: {url}")
                
                html = await self.fetch(url)
                if html:
                    links = self.extract_links(html, url)
                    for link in links:
                        if link not in self.visited:
                            await self.to_crawl.put(link)
                
                self.to_crawl.task_done()
            except asyncio.CancelledError:
                break
    
    async def crawl(self, start_url, max_pages=50):
        self.session = aiohttp.ClientSession()
        await self.to_crawl.put(start_url)
        
        # 创建工作线程
        workers = [
            asyncio.create_task(self.worker(f"Worker-{i}"))
            for i in range(self.max_concurrent)
        ]
        
        # 等待队列为空或达到最大页面数
        while len(self.visited) = max_pages:
                break
        
        # 取消工作线程
        for worker in workers:
            worker.cancel()
        
        await self.to_crawl.join()
        await self.session.close()
        
        return list(self.visited)[:max_pages]

async def main():
    crawler = AsyncWebCrawler(max_concurrent=5)
    start_time = time.time()
    
    try:
        results = await crawler.crawl("https://httpbin.org", max_pages=20)
        print(f"n爬取完成! 找到 {len(results)} 个页面")
        print("示例页面:", results[:5])
    except Exception as e:
        print(f"爬虫错误: {e}")
    
    end_time = time.time()
    print(f"总耗时: {end_time - start_time:.2f} 秒")

if __name__ == "__main__":
    asyncio.run(main())

最佳实践与性能优化

遵循这些最佳实践可以编写出更高效、更可靠的异步代码:

1. 避免阻塞操作

import asyncio
import time

# 错误做法 - 在协程中使用阻塞调用
async def bad_example():
    time.sleep(2)  # 阻塞调用,会阻塞事件循环
    
# 正确做法 - 使用异步版本或run_in_executor
async def good_example():
    await asyncio.sleep(2)  # 非阻塞
    
async def cpu_intensive_task():
    loop = asyncio.get_event_loop()
    # 将CPU密集型任务放到执行器中
    await loop.run_in_executor(None, lambda: time.sleep(2))

2. 合理控制并发数量

async def bounded_gather(tasks, concurrency_limit):
    semaphore = asyncio.Semaphore(concurrency_limit)
    
    async def bounded_task(task):
        async with semaphore:
            return await task
    
    return await asyncio.gather(*[bounded_task(task) for task in tasks])

# 使用示例
async def example_usage():
    tasks = [asyncio.sleep(1) for _ in range(100)]
    await bounded_gather(tasks, 10)  # 限制10个并发

3. 正确处理错误和异常

async def robust_task():
    try:
        result = await might_fail()
        return {"success": True, "data": result}
    except asyncio.CancelledError:
        print("任务被取消")
        raise
    except Exception as e:
        print(f"任务失败: {e}")
        return {"success": False, "error": str(e)}

async def main():
    tasks = [robust_task() for _ in range(5)]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    for result in results:
        if isinstance(result, Exception):
            print(f"异常结果: {result}")
        else:
            print(f"正常结果: {result}")

4. 使用结构化并发

async def structured_concurrency_example():
    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(asyncio.sleep(1))
            task2 = tg.create_task(asyncio.sleep(2))
        
        print("所有任务完成")
    except* Exception as eg:
        print(f"任务组中出现异常: {eg.exceptions}")

# Python 3.11+ 支持TaskGroup

5. 监控和调试异步代码

import logging
logging.basicConfig(level=logging.DEBUG)

async def debug_example():
    # 启用调试
    asyncio.get_event_loop().set_debug(True)
    
    # 设置慢回调警告阈值(秒)
    asyncio.get_event_loop().slow_callback_duration = 0.1
    
    await asyncio.sleep(0.2)  # 这会触发慢回调警告

Python异步编程完全指南:从协程到高性能应用 | Python高级编程教程
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 python Python异步编程完全指南:从协程到高性能应用 | Python高级编程教程 https://www.taomawang.com/server/python/966.html

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务