Python异步革命:基于协程的分布式任务调度系统实战
一、架构设计
基于Redis和协程的高性能任务调度系统,支持10万级任务并发,延迟低于50毫秒
二、核心实现
1. 异步任务执行器
# task_executor.py
import asyncio
import aioredis
from concurrent.futures import ThreadPoolExecutor
class AsyncExecutor:
def __init__(self, redis_url='redis://localhost'):
self.redis = None
self.pool = ThreadPoolExecutor(max_workers=100)
self.redis_url = redis_url
self.running_tasks = set()
async def connect(self):
self.redis = await aioredis.from_url(self.redis_url)
async def process_task(self, queue_name='task_queue'):
while True:
_, task_data = await self.redis.brpop(queue_name)
task = self.deserialize_task(task_data)
future = asyncio.wrap_future(
self.pool.submit(self.execute_task, task)
)
self.running_tasks.add(future)
future.add_done_callback(lambda f: self.running_tasks.discard(f))
def execute_task(self, task):
# 实际任务执行逻辑
print(f"Executing task {task['id']}")
return task['id']
def deserialize_task(self, data):
import json
return json.loads(data.decode())
async def graceful_shutdown(self):
await asyncio.gather(*self.running_tasks)
await self.redis.close()
self.pool.shutdown()
2. 分布式任务调度器
# task_scheduler.py
import asyncio
import aioredis
from datetime import datetime
class DistributedScheduler:
def __init__(self, redis_url='redis://localhost'):
self.redis = None
self.redis_url = redis_url
self.scheduled_tasks = {}
async def connect(self):
self.redis = await aioredis.from_url(self.redis_url)
async def schedule_task(self, task_id, task_data, delay=0):
execute_at = datetime.now().timestamp() + delay
task = {
'id': task_id,
'data': task_data,
'execute_at': execute_at
}
await self.redis.zadd('scheduled_tasks', {self.serialize_task(task): execute_at})
self.scheduled_tasks[task_id] = task
async def dispatch_tasks(self):
while True:
now = datetime.now().timestamp()
tasks = await self.redis.zrangebyscore(
'scheduled_tasks', 0, now, count=100)
if tasks:
pipe = self.redis.pipeline()
for task_data in tasks:
pipe.lpush('task_queue', task_data)
pipe.zrem('scheduled_tasks', task_data)
await pipe.execute()
await asyncio.sleep(1)
def serialize_task(self, task):
import json
return json.dumps(task)
三、高级特性
1. 任务依赖管理
# dependency_manager.py
class TaskDependency:
def __init__(self, redis):
self.redis = redis
async def add_dependency(self, task_id, depends_on):
await self.redis.sadd(f"task:{task_id}:dependencies", *depends_on)
for dep_id in depends_on:
await self.redis.sadd(f"task:{dep_id}:dependents", task_id)
async def check_dependencies(self, task_id):
deps = await self.redis.smembers(f"task:{task_id}:dependencies")
return len(deps) == 0 or all(
await self.redis.get(f"task:{dep}:status") == b'completed'
for dep in deps
)
async def notify_completion(self, task_id):
await self.redis.set(f"task:{task_id}:status", "completed")
dependents = await self.redis.smembers(f"task:{task_id}:dependents")
for dep_id in dependents:
if await self.check_dependencies(dep_id):
await self.redis.lpush('ready_queue', dep_id)
2. 故障恢复机制
# recovery_manager.py
class TaskRecovery:
def __init__(self, redis):
self.redis = redis
async def start_heartbeat(self, worker_id, interval=30):
while True:
await self.redis.setex(
f"worker:{worker_id}:heartbeat",
interval * 2,
"alive"
)
await asyncio.sleep(interval)
async def recover_tasks(self):
workers = await self.redis.keys("worker:*:heartbeat")
active_workers = set(
w.decode().split(':')[1]
for w in workers
if await self.redis.ttl(w) > 0
)
# 恢复中断的任务
all_tasks = await self.redis.hgetall("task:assignments")
for worker_id, tasks in all_tasks.items():
if worker_id.decode() not in active_workers:
for task_id in tasks.decode().split(','):
await self.redis.lpush('task_queue', task_id)
await self.redis.hdel("task:assignments", worker_id)
四、完整案例
# main.py
import asyncio
from task_executor import AsyncExecutor
from task_scheduler import DistributedScheduler
async def main():
redis_url = "redis://localhost"
# 启动执行器集群
executors = [AsyncExecutor(redis_url) for _ in range(3)]
await asyncio.gather(*(e.connect() for e in executors))
executor_tasks = [e.process_task() for e in executors]
# 启动调度器
scheduler = DistributedScheduler(redis_url)
await scheduler.connect()
# 模拟任务调度
await scheduler.schedule_task("task1", {"action": "process_data"}, delay=5)
await scheduler.schedule_task("task2", {"action": "send_email"}, delay=10)
await asyncio.gather(
scheduler.dispatch_tasks(),
*executor_tasks
)
if __name__ == "__main__":
asyncio.run(main())