Python黑科技:构建智能自动化工作流引擎
二、核心实现
1. 工作流节点定义
from dataclasses import dataclass
from typing import Callable, List, Dict, Any
@dataclass
class WorkflowNode:
node_id: str
task_func: Callable[[Dict[str, Any]], Any]
depends_on: List[str] = None
retries: int = 0
timeout: int = 300
def __post_init__(self):
self.depends_on = self.depends_on or []
class WorkflowEngine:
def __init__(self):
self.nodes = {}
self.execution_log = []
def add_node(self, node: WorkflowNode):
if node.node_id in self.nodes:
raise ValueError(f"Node {node.node_id} already exists")
self.nodes[node.node_id] = node
def validate_dag(self):
"""检查是否存在循环依赖"""
visited = set()
recursion_stack = set()
def dfs(node_id):
if node_id in recursion_stack:
return False
if node_id in visited:
return True
visited.add(node_id)
recursion_stack.add(node_id)
node = self.nodes[node_id]
for dep_id in node.depends_on:
if not dfs(dep_id):
return False
recursion_stack.remove(node_id)
return True
return all(dfs(node_id) for node_id in self.nodes)
2. 智能调度执行
from concurrent.futures import ThreadPoolExecutor
import time
class WorkflowExecutor:
def __init__(self, max_workers=4):
self.engine = WorkflowEngine()
self.executor = ThreadPoolExecutor(max_workers)
self.context = {}
async def execute_node(self, node_id):
node = self.engine.nodes[node_id]
# 等待前置节点完成
for dep_id in node.depends_on:
while dep_id not in self.context:
await asyncio.sleep(0.1)
# 执行当前节点
attempt = 0
while attempt node.retries:
raise
time.sleep(2 ** attempt) # 指数退避
async def run_workflow(self):
if not self.engine.validate_dag():
raise ValueError("Invalid DAG with cycles")
tasks = {
node_id: asyncio.create_task(self.execute_node(node_id))
for node_id in self.engine.nodes
}
await asyncio.gather(*tasks.values())
三、高级特性
1. 可视化监控
from graphviz import Digraph
def visualize_workflow(engine: WorkflowEngine):
dot = Digraph(comment='Workflow DAG')
for node_id in engine.nodes:
dot.node(node_id)
for node in engine.nodes.values():
for dep_id in node.depends_on:
dot.edge(dep_id, node_id)
dot.render('workflow.gv', view=True)
2. 动态任务注入
class DynamicWorkflow(WorkflowEngine):
def add_dynamic_node(self, parent_id, new_node):
"""运行时添加子节点"""
if parent_id not in self.nodes:
raise ValueError("Parent node not found")
self.add_node(new_node)
# 自动建立依赖关系
new_node.depends_on.append(parent_id)
# 更新执行计划
if hasattr(self, 'executor'):
asyncio.create_task(self.execute_node(new_node.node_id))
四、完整案例
# 定义工作流任务
def fetch_data(deps):
print("Fetching data...")
return {"raw": [1, 2, 3]}
def process_data(deps):
print("Processing data...")
return {"processed": [x*2 for x in deps["fetch_data"]["raw"]]}
def save_results(deps):
print(f"Saving results: {deps['process_data']['processed']}")
# 创建工作流
engine = WorkflowEngine()
engine.add_node(WorkflowNode("fetch_data", fetch_data))
engine.add_node(WorkflowNode("process_data", process_data, ["fetch_data"]))
engine.add_node(WorkflowNode("save_results", save_results, ["process_data"]))
# 执行并可视化
executor = WorkflowExecutor()
asyncio.run(executor.run_workflow())
visualize_workflow(engine)