Python黑科技:构建智能自动化工作流引擎

2025-07-24 0 600

Python黑科技:构建智能自动化工作流引擎

一、架构设计

基于有向无环图(DAG)的动态工作流系统,支持1000+节点的复杂任务编排

二、核心实现

1. 工作流节点定义

from dataclasses import dataclass
from typing import Callable, List, Dict, Any

@dataclass
class WorkflowNode:
    node_id: str
    task_func: Callable[[Dict[str, Any]], Any]
    depends_on: List[str] = None
    retries: int = 0
    timeout: int = 300

    def __post_init__(self):
        self.depends_on = self.depends_on or []

class WorkflowEngine:
    def __init__(self):
        self.nodes = {}
        self.execution_log = []

    def add_node(self, node: WorkflowNode):
        if node.node_id in self.nodes:
            raise ValueError(f"Node {node.node_id} already exists")
        self.nodes[node.node_id] = node

    def validate_dag(self):
        """检查是否存在循环依赖"""
        visited = set()
        recursion_stack = set()

        def dfs(node_id):
            if node_id in recursion_stack:
                return False
            if node_id in visited:
                return True
                
            visited.add(node_id)
            recursion_stack.add(node_id)
            
            node = self.nodes[node_id]
            for dep_id in node.depends_on:
                if not dfs(dep_id):
                    return False
                    
            recursion_stack.remove(node_id)
            return True

        return all(dfs(node_id) for node_id in self.nodes)

2. 智能调度执行

from concurrent.futures import ThreadPoolExecutor
import time

class WorkflowExecutor:
    def __init__(self, max_workers=4):
        self.engine = WorkflowEngine()
        self.executor = ThreadPoolExecutor(max_workers)
        self.context = {}

    async def execute_node(self, node_id):
        node = self.engine.nodes[node_id]
        
        # 等待前置节点完成
        for dep_id in node.depends_on:
            while dep_id not in self.context:
                await asyncio.sleep(0.1)
        
        # 执行当前节点
        attempt = 0
        while attempt  node.retries:
                    raise
                time.sleep(2 ** attempt)  # 指数退避

    async def run_workflow(self):
        if not self.engine.validate_dag():
            raise ValueError("Invalid DAG with cycles")
            
        tasks = {
            node_id: asyncio.create_task(self.execute_node(node_id))
            for node_id in self.engine.nodes
        }
        await asyncio.gather(*tasks.values())

三、高级特性

1. 可视化监控

from graphviz import Digraph

def visualize_workflow(engine: WorkflowEngine):
    dot = Digraph(comment='Workflow DAG')
    
    for node_id in engine.nodes:
        dot.node(node_id)
        
    for node in engine.nodes.values():
        for dep_id in node.depends_on:
            dot.edge(dep_id, node_id)
    
    dot.render('workflow.gv', view=True)

2. 动态任务注入

class DynamicWorkflow(WorkflowEngine):
    def add_dynamic_node(self, parent_id, new_node):
        """运行时添加子节点"""
        if parent_id not in self.nodes:
            raise ValueError("Parent node not found")
            
        self.add_node(new_node)
        # 自动建立依赖关系
        new_node.depends_on.append(parent_id)
        
        # 更新执行计划
        if hasattr(self, 'executor'):
            asyncio.create_task(self.execute_node(new_node.node_id))

四、完整案例

# 定义工作流任务
def fetch_data(deps):
    print("Fetching data...")
    return {"raw": [1, 2, 3]}

def process_data(deps):
    print("Processing data...")
    return {"processed": [x*2 for x in deps["fetch_data"]["raw"]]}

def save_results(deps):
    print(f"Saving results: {deps['process_data']['processed']}")

# 创建工作流
engine = WorkflowEngine()
engine.add_node(WorkflowNode("fetch_data", fetch_data))
engine.add_node(WorkflowNode("process_data", process_data, ["fetch_data"]))
engine.add_node(WorkflowNode("save_results", save_results, ["process_data"]))

# 执行并可视化
executor = WorkflowExecutor()
asyncio.run(executor.run_workflow())
visualize_workflow(engine)
Python黑科技:构建智能自动化工作流引擎
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 python Python黑科技:构建智能自动化工作流引擎 https://www.taomawang.com/server/python/621.html

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务