免费资源下载
队列状态
待处理任务: 0
延迟任务: 0
任务类型分布:
系统指标
CPU使用率: 0%
内存使用率: 0%
活跃工作者: 0
“””
return HTMLResponse(content=html)
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
“””WebSocket端点”””
await websocket.accept()
monitor.websocket_clients.append(websocket)
try:
while True:
await asyncio.sleep(2) # 每2秒更新一次
await monitor.broadcast_metrics()
except:
monitor.websocket_clients.remove(websocket)
使用FastAPI构建实时监控界面,通过WebSocket推送系统指标,提供可视化的监控体验。
完整示例:电商订单处理系统
# 电商订单处理任务定义
class OrderProcessingSystem:
def __init__(self):
self.scheduler = DistributedScheduler()
self.setup_tasks()
def setup_tasks(self):
"""设置订单处理任务"""
@self.scheduler.register_task
@async_task(max_retries=5)
async def validate_order(order_data: Dict) -> Dict:
"""验证订单"""
await asyncio.sleep(0.5) # 模拟验证过程
if not order_data.get('items'):
raise ValueError("订单中没有商品")
return {"valid": True, "order_id": order_data['id']}
@self.scheduler.register_task
@async_task(max_retries=3)
async def process_payment(order_id: str, amount: float) -> Dict:
"""处理支付"""
await asyncio.sleep(1) # 模拟支付处理
# 模拟支付失败率10%
import random
if random.random() < 0.1:
raise ValueError("支付处理失败")
return {"paid": True, "transaction_id": f"txn_{order_id}"}
@self.scheduler.register_task
@async_task(max_retries=2)
async def update_inventory(order_id: str, items: List[Dict]):
"""更新库存"""
await asyncio.sleep(0.8)
# 模拟库存更新
return {"inventory_updated": True}
@self.scheduler.register_task
@async_task(max_retries=3)
async def send_confirmation_email(order_id: str, email: str):
"""发送确认邮件"""
await asyncio.sleep(0.3)
# 模拟邮件发送
return {"email_sent": True}
async def process_complete_order(self, order_data: Dict):
"""处理完整订单流程"""
tasks = []
# 创建任务链
validate_task_id = await self.scheduler.schedule(
'validate_order', order_data
)
payment_task_id = await self.scheduler.schedule(
'process_payment',
order_data['id'],
order_data['total_amount']
)
inventory_task_id = await self.scheduler.schedule(
'update_inventory',
order_data['id'],
order_data['items']
)
email_task_id = await self.scheduler.schedule(
'send_confirmation_email',
order_data['id'],
order_data['customer_email']
)
# 等待所有任务完成
while True:
all_done = True
for task_id in [validate_task_id, payment_task_id,
inventory_task_id, email_task_id]:
status = self.scheduler.get_task_status(task_id)
if status.get('status') not in ['success', 'failed']:
all_done = False
break
if all_done:
break
await asyncio.sleep(1)
# 收集结果
results = {}
for task_id in [validate_task_id, payment_task_id,
inventory_task_id, email_task_id]:
status = self.scheduler.get_task_status(task_id)
results[task_id] = status
return results
# 使用示例
async def main():
order_system = OrderProcessingSystem()
# 启动调度器
scheduler_task = asyncio.create_task(
order_system.scheduler.process_tasks()
)
# 处理订单
order_data = {
'id': 'order_123',
'customer_email': 'customer@example.com',
'items': [{'product_id': 'prod_1', 'quantity': 2}],
'total_amount': 199.99
}
results = await order_system.process_complete_order(order_data)
print("订单处理结果:", results)
# 关闭系统
order_system.scheduler.worker_pool.shutdown()
scheduler_task.cancel()
if __name__ == "__main__":
asyncio.run(main())
这个完整的电商订单处理系统展示了如何在实际业务场景中应用我们的任务调度器。