Python异步革命:基于协程的分布式任务调度系统实战

2025-07-26 0 465

Python异步革命:基于协程的分布式任务调度系统实战

一、架构设计

基于Redis和协程的高性能任务调度系统,支持10万级任务并发,延迟低于50毫秒

二、核心实现

1. 异步任务执行器

# task_executor.py
import asyncio
import aioredis
from concurrent.futures import ThreadPoolExecutor

class AsyncExecutor:
    def __init__(self, redis_url='redis://localhost'):
        self.redis = None
        self.pool = ThreadPoolExecutor(max_workers=100)
        self.redis_url = redis_url
        self.running_tasks = set()

    async def connect(self):
        self.redis = await aioredis.from_url(self.redis_url)

    async def process_task(self, queue_name='task_queue'):
        while True:
            _, task_data = await self.redis.brpop(queue_name)
            task = self.deserialize_task(task_data)
            future = asyncio.wrap_future(
                self.pool.submit(self.execute_task, task)
            )
            self.running_tasks.add(future)
            future.add_done_callback(lambda f: self.running_tasks.discard(f))

    def execute_task(self, task):
        # 实际任务执行逻辑
        print(f"Executing task {task['id']}")
        return task['id']

    def deserialize_task(self, data):
        import json
        return json.loads(data.decode())

    async def graceful_shutdown(self):
        await asyncio.gather(*self.running_tasks)
        await self.redis.close()
        self.pool.shutdown()

2. 分布式任务调度器

# task_scheduler.py
import asyncio
import aioredis
from datetime import datetime

class DistributedScheduler:
    def __init__(self, redis_url='redis://localhost'):
        self.redis = None
        self.redis_url = redis_url
        self.scheduled_tasks = {}

    async def connect(self):
        self.redis = await aioredis.from_url(self.redis_url)

    async def schedule_task(self, task_id, task_data, delay=0):
        execute_at = datetime.now().timestamp() + delay
        task = {
            'id': task_id,
            'data': task_data,
            'execute_at': execute_at
        }
        await self.redis.zadd('scheduled_tasks', {self.serialize_task(task): execute_at})
        self.scheduled_tasks[task_id] = task

    async def dispatch_tasks(self):
        while True:
            now = datetime.now().timestamp()
            tasks = await self.redis.zrangebyscore(
                'scheduled_tasks', 0, now, count=100)
            
            if tasks:
                pipe = self.redis.pipeline()
                for task_data in tasks:
                    pipe.lpush('task_queue', task_data)
                    pipe.zrem('scheduled_tasks', task_data)
                await pipe.execute()
            
            await asyncio.sleep(1)

    def serialize_task(self, task):
        import json
        return json.dumps(task)

三、高级特性

1. 任务依赖管理

# dependency_manager.py
class TaskDependency:
    def __init__(self, redis):
        self.redis = redis

    async def add_dependency(self, task_id, depends_on):
        await self.redis.sadd(f"task:{task_id}:dependencies", *depends_on)
        for dep_id in depends_on:
            await self.redis.sadd(f"task:{dep_id}:dependents", task_id)

    async def check_dependencies(self, task_id):
        deps = await self.redis.smembers(f"task:{task_id}:dependencies")
        return len(deps) == 0 or all(
            await self.redis.get(f"task:{dep}:status") == b'completed'
            for dep in deps
        )

    async def notify_completion(self, task_id):
        await self.redis.set(f"task:{task_id}:status", "completed")
        dependents = await self.redis.smembers(f"task:{task_id}:dependents")
        for dep_id in dependents:
            if await self.check_dependencies(dep_id):
                await self.redis.lpush('ready_queue', dep_id)

2. 故障恢复机制

# recovery_manager.py
class TaskRecovery:
    def __init__(self, redis):
        self.redis = redis

    async def start_heartbeat(self, worker_id, interval=30):
        while True:
            await self.redis.setex(
                f"worker:{worker_id}:heartbeat", 
                interval * 2, 
                "alive"
            )
            await asyncio.sleep(interval)

    async def recover_tasks(self):
        workers = await self.redis.keys("worker:*:heartbeat")
        active_workers = set(
            w.decode().split(':')[1] 
            for w in workers 
            if await self.redis.ttl(w) > 0
        )

        # 恢复中断的任务
        all_tasks = await self.redis.hgetall("task:assignments")
        for worker_id, tasks in all_tasks.items():
            if worker_id.decode() not in active_workers:
                for task_id in tasks.decode().split(','):
                    await self.redis.lpush('task_queue', task_id)
                await self.redis.hdel("task:assignments", worker_id)

四、完整案例

# main.py
import asyncio
from task_executor import AsyncExecutor
from task_scheduler import DistributedScheduler

async def main():
    redis_url = "redis://localhost"
    
    # 启动执行器集群
    executors = [AsyncExecutor(redis_url) for _ in range(3)]
    await asyncio.gather(*(e.connect() for e in executors))
    executor_tasks = [e.process_task() for e in executors]
    
    # 启动调度器
    scheduler = DistributedScheduler(redis_url)
    await scheduler.connect()
    
    # 模拟任务调度
    await scheduler.schedule_task("task1", {"action": "process_data"}, delay=5)
    await scheduler.schedule_task("task2", {"action": "send_email"}, delay=10)
    
    await asyncio.gather(
        scheduler.dispatch_tasks(),
        *executor_tasks
    )

if __name__ == "__main__":
    asyncio.run(main())
Python异步革命:基于协程的分布式任务调度系统实战
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 python Python异步革命:基于协程的分布式任务调度系统实战 https://www.taomawang.com/server/python/658.html

下一篇:

已经没有下一篇了!

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务