Sooua
登录
返回文章列表
Claude Code··12 分钟阅读

动态工作流与子代理编排

动态工作流(Dynamic Workflows) 允许你:

目标:使用动态工作流编排多个子代理,处理大规模复杂任务
预计时间:40 分钟
对应官方文档:WorkflowsAgent TeamsAgent View


什么是动态工作流?

动态工作流(Dynamic Workflows) 允许你:

  • 定义多步骤任务流程
  • 自动编排子代理执行
  • 处理依赖关系和并行执行
  • 收集和汇总结果

适用场景:

  • 大规模代码库重构
  • 跨模块功能开发
  • 自动化代码审查
  • 批量数据处理

工作流架构

动态工作流执行流程

代理团队协作时序


工作流定义

YAML 格式

# workflow.yaml
name: codebase-modernization
description: 将 Python 2 代码迁移到 Python 3
 
steps:
  # 步骤 1:分析阶段
  - id: analyze
    name: 分析代码库
    agent: analyzer
    task: |
      扫描整个代码库,识别:
      1. Python 2 特有语法
      2. 废弃的库引用
      3. 字符串/字节处理问题
      4. 打印语句
    output: analysis_report.json
 
  # 步骤 2:并行修复(基于分析结果)
  - id: fix-syntax
    name: 修复语法问题
    agent: refactor-agent
    depends: [analyze]
    input: analysis_report.json
    filter: "category == 'syntax'"
    task: "修复所有 Python 2 语法问题"
 
  - id: fix-imports
    name: 更新导入语句
    agent: refactor-agent
    depends: [analyze]
    input: analysis_report.json
    filter: "category == 'imports'"
    task: "更新所有废弃的库引用"
 
  - id: fix-strings
    name: 修复字符串处理
    agent: refactor-agent
    depends: [analyze]
    input: analysis_report.json
    filter: "category == 'strings'"
    task: "修复字符串/字节处理问题"
 
  # 步骤 3:合并结果
  - id: integrate
    name: 合并修改
    agent: integrator
    depends: [fix-syntax, fix-imports, fix-strings]
    task: |
      合并所有子代理的修改,解决冲突:
      1. 检查是否有重叠修改
      2. 解决 git 冲突
      3. 确保代码可运行
 
  # 步骤 4:验证
  - id: verify
    name: 验证迁移结果
    agent: tester
    depends: [integrate]
    task: |
      全面验证:
      1. 运行 2to3 检查无遗漏
      2. 执行完整测试套件
      3. 性能基准测试
      4. 生成迁移报告
    output: verification_report.json

完整的 Agent SDK 工作流示例

#!/usr/bin/env python3
"""大规模代码重构工作流示例"""
 
import asyncio
from dataclasses import dataclass
from typing import List, Dict
from claude_agent_sdk import Agent, AgentLoop, Workflow
 
@dataclass
class RefactorTask:
    module: str
    files: List[str]
    dependencies: List[str]
    status: str = "pending"  # pending / running / done / failed
 
class CodebaseRefactorWorkflow:
    def __init__(self, project_dir: str):
        self.project_dir = project_dir
        self.agents = {}
        self.tasks: Dict[str, RefactorTask] = {}
        
    async def setup_agents(self):
        """初始化各角色代理"""
        self.agents["analyzer"] = Agent(
            model="claude-opus",
            name="code-analyzer",
            instructions="""你是代码分析专家。你的任务是:
1. 扫描代码库,识别技术债务
2. 找出重复代码和过时的模式
3. 制定安全的重构计划
4. 标记需要特别注意的边界情况
 
输出格式必须是 JSON:
{
  "issues": [{"file": "", "line": 0, "type": "", "severity": ""}],
  "refactor_plan": [{"module": "", "files": [], "strategy": ""}]
}"""
        )
        
        self.agents["refactor"] = Agent(
            model="claude-sonnet",
            name="refactor-dev",
            instructions="""你是重构专家。你的任务是:
1. 按重构计划执行修改
2. 保持向后兼容
3. 每次修改后运行测试
4. 如果测试失败,回滚并报告
 
约束:
- 不要一次修改超过 5 个文件
- 所有修改必须有对应的测试
- 不要改变 API 接口签名"""
        )
        
        self.agents["tester"] = Agent(
            model="claude-sonnet",
            name="test-expert",
            instructions="""你是测试专家。你的任务是:
1. 为重构后的代码补充测试
2. 运行全量回归测试
3. 检查测试覆盖率不下降
4. 生成测试报告"""
        )
    
    async def analyze(self) -> Dict:
        """第一阶段:代码分析"""
        print("🔍 阶段 1/4: 分析代码库...")
        analyzer = self.agents["analyzer"]
        
        result = await analyzer.run(
            f"分析项目 {self.project_dir} 的代码质量,"
            f"识别需要重构的模块和具体文件。"
            f"列出所有发现的问题并按严重程度排序。"
        )
        
        # 解析分析结果
        analysis = result.parsed_output
        for plan in analysis.get("refactor_plan", []):
            self.tasks[plan["module"]] = RefactorTask(
                module=plan["module"],
                files=plan["files"],
                dependencies=plan.get("dependencies", [])
            )
        
        return analysis
    
    async def parallel_refactor(self):
        """第二阶段:并行重构"""
        print("🔧 阶段 2/4: 并行重构...")
        
        # 按依赖拓扑排序,无依赖的先执行
        ready_tasks = [t for t in self.tasks.values() if not t.dependencies]
        
        async def refactor_module(task: RefactorTask):
            task.status = "running"
            agent = Agent(
                model="claude-sonnet",
                name=f"refactor-{task.module}",
                project_dir=self.project_dir
            )
            
            result = await agent.run(
                f"重构模块 {task.module},涉及文件: {', '.join(task.files)}\n"
                f"要求:\n"
                f"1. 保持所有现有测试通过\n"
                f"2. 添加新功能的单元测试\n"
                f"3. 更新相关文档\n"
                f"4. 遵循项目的编码规范"
            )
            
            task.status = "done" if result.success else "failed"
            return result
        
        # 并行执行所有就绪任务
        results = await asyncio.gather(*[
            refactor_module(t) for t in ready_tasks
        ])
        
        return results
    
    async def integrate(self):
        """第三阶段:合并与冲突解决"""
        print("🔄 阶段 3/4: 合并修改...")
        
        integrator = Agent(
            model="claude-opus",
            name="integrator"
        )
        
        result = await integrator.run(
            f"检查 {self.project_dir} 的所有修改,\n"
            f"解决可能的冲突,确保代码能正常编译和运行。\n"
            f"如果有冲突无法自动解决,列出需要人工介入的文件。"
        )
        
        return result
    
    async def verify(self) -> bool:
        """第四阶段:验证"""
        print("✅ 阶段 4/4: 验证结果...")
        
        tester = self.agents["tester"]
        result = await tester.run(
            f"在 {self.project_dir} 运行完整测试套件,\n"
            f"包括:单元测试、集成测试、类型检查、代码风格检查。\n"
            f"生成详细的测试报告。"
        )
        
        return result.success
    
    async def run(self):
        """运行完整工作流"""
        await self.setup_agents()
        
        try:
            # 阶段 1: 分析
            analysis = await self.analyze()
            if not self.tasks:
                print("✅ 代码库状态良好,无需重构")
                return {"status": "success", "message": "无需重构"}
            
            # 阶段 2: 并行重构
            refactor_results = await self.parallel_refactor()
            
            # 检查是否有失败
            failed = [r for r in refactor_results if not r.success]
            if failed:
                print(f"⚠️ {len(failed)} 个模块重构失败,需要人工介入")
                return {"status": "partial", "failed": failed}
            
            # 阶段 3: 合并
            await self.integrate()
            
            # 阶段 4: 验证
            if await self.verify():
                print("🎉 重构完成!所有测试通过")
                return {"status": "success", "tasks": self.tasks}
            else:
                print("❌ 测试失败,请查看报告")
                return {"status": "failed", "stage": "verify"}
                
        except Exception as e:
            print(f"💥 工作流异常: {e}")
            return {"status": "error", "message": str(e)}
 
# 使用示例
async def main():
    workflow = CodebaseRefactorWorkflow("./my-project")
    result = await workflow.run()
    print(f"最终结果: {result}")
 
if __name__ == "__main__":
    asyncio.run(main())

执行工作流

命令行

claude workflow run workflow.yaml

Agent SDK

from claude_agent_sdk import Workflow
 
workflow = Workflow.from_file("workflow.yaml")
result = workflow.run(
    project_dir="./legacy-project",
    agents={
        "analyzer": analyzer_agent,
        "refactor-agent": refactor_agent,
        "integrator": integrator_agent,
        "tester": tester_agent
    }
)
 
print(result.status)  # success / partial / failed
print(result.reports)

Agent Teams(代理团队)

概念

Agent Team = 多个 Claude Code 实例协同工作:

Team: "后端开发组"
├── Agent-1: 数据库专家
├── Agent-2: API 开发者
├── Agent-3: 测试工程师
└── Agent-4: 代码审查员

创建团队

from claude_agent_sdk import AgentTeam
 
team = AgentTeam(
    name="backend-dev",
    description="后端功能开发团队"
)
 
# 添加专家代理
team.add_agent(
    name="db-expert",
    model="claude-opus",
    instructions="数据库设计和优化专家",
    skills=["sql", "migration", "indexing"]
)
 
team.add_agent(
    name="api-dev",
    model="claude-sonnet",
    instructions="REST API 开发专家",
    skills=["fastapi", "openapi", "validation"]
)
 
team.add_agent(
    name="tester",
    model="claude-sonnet",
    instructions="测试覆盖专家",
    skills=["pytest", "mock", "coverage"]
)

分配任务

# 创建功能开发任务
task = team.create_task(
    name="实现用户订单系统",
    description=""")
 
# 自动分配子任务
assignment = team.assign(task)
# db-expert -> 设计订单表
# api-dev -> 实现订单 API
# tester -> 编写测试用例
 
# 监控进度
for update in team.track(task):
    print(f"{update.agent}: {update.status} - {update.progress}%")

Agent View(代理视图)

功能

在桌面应用或 Web 中,Agent View 提供:

  • 所有活跃会话的仪表盘
  • 实时查看每个代理的工作状态
  • 集中处理需要人类确认的请求

使用

# 启动 Agent View
claude agent-view
 
# 或创建带监控的代理
claude --agent-view --name "batch-refactor"

监控面板

┌─────────────────────────────────────────────┐
│ Agent View - 5 Active Sessions              │
├──────────────┬────────┬──────────┬──────────┤
│ Name         │ Status │ Progress │ Action   │
├──────────────┼────────┼──────────┼──────────┤
│ auth-refactor│ 🟢 Run │ 65%      │ [查看]   │
│ api-update   │ ⏸️ Wait│ -        │ [确认]   │
│ test-gen     │ 🟢 Run │ 40%      │ [查看]   │
│ doc-update   │ ✅ Done│ 100%     │ [报告]   │
│ security-scan│ 🟢 Run │ 20%      │ [查看]   │
└──────────────┴────────┴──────────┴──────────┘

实战场景

场景 1:遗留系统现代化

背景: 一个 5 年历史的 Django 2.0 项目,需要升级到 Django 5.0,同时迁移到 async ORM。

工作流配置:

# django-upgrade.yaml
name: django-2-to-5-upgrade
description: Django 2.0 → 5.0 升级工作流
 
phases:
  1-analysis:
    steps:
      - agent: code-archaeologist
        task: |
          扫描整个 Django 项目:
          1. 列出所有 Django 2.0 特有的 API 用法
          2. 识别不兼容的第三方包
          3. 标记所有同步视图需要 async 化的地方
          4. 检查数据库迁移兼容性
        output: django-upgrade-report.json
 
  2-preparation:
    steps:
      - agent: dependency-manager
        task: |
          基于分析报告:
          1. 更新 requirements.txt 到兼容版本
          2. 替换废弃的包(如 django-celery → celery)
          3. 添加 django-upgrade 工具到 CI
        depends: [1-analysis]
 
  3-parallel-migration:
    parallel:
      - agent: async-migrator
        task: |
          将所有同步视图改为 async:
          - views.py 中的函数视图 → async def
          - ORM 查询添加 async 支持
          - 中间件适配 async
        depends: [2-preparation]
        
      - agent: model-updater
        task: |
          更新模型层:
          - 替换废弃的 Field 类型
          - 更新 Meta 选项
          - 重新生成迁移文件
        depends: [2-preparation]
        
      - agent: template-modernizer
        task: |
          更新模板层:
          - 替换废弃的模板标签
          - 更新静态文件引用方式
        depends: [2-preparation]
 
  4-integration:
    steps:
      - agent: integrator
        task: |
          合并所有修改:
          1. 解决文件冲突
          2. 运行 django-upgrade --check
          3. 生成最终迁移计划
        depends: [3-parallel-migration]
 
  5-verification:
    steps:
      - agent: qa-engineer
        task: |
          全面验证:
          1. python manage.py check --deploy
          2. python manage.py test --parallel
          3. 运行 security check (bandit, safety)
          4. 性能基准测试(wrk/ab)
        depends: [4-integration]

执行:

# 创建工作树隔离
git worktree add ../django-upgrade-workspace
 
cd ../django-upgrade-workspace
 
# 启动工作流
claude workflow run django-upgrade.yaml \
  --agent-view \
  --timeout 2h \
  --notify slack:#dev-channel

预期结果:

  • 自动完成 80%+ 的机械性修改
  • 人工只需处理业务逻辑相关的复杂变更
  • 升级时间从 2 周缩短到 2 天

场景 2:微服务拆分

背景: 单体电商应用(20万行代码)需要拆分为:用户服务、订单服务、库存服务、支付服务。

执行策略:

# microservice-split.py
from claude_agent_sdk import AgentTeam, Workflow
 
team = AgentTeam(name="microservice-migration")
 
# 添加领域专家代理
team.add_agent("user-domain-expert", 
    instructions="用户领域专家:理解用户认证、权限、Profile 等业务逻辑")
team.add_agent("order-domain-expert",
    instructions="订单领域专家:理解订单状态机、优惠计算、拆单逻辑")
team.add_agent("api-designer",
    instructions="API 设计专家:设计 RESTful API,确保向后兼容")
 
# 定义工作流
workflow = Workflow([
    # 阶段1:领域分析
    {"agent": "user-domain-expert", "task": "提取用户相关代码到独立服务"},
    {"agent": "order-domain-expert", "task": "提取订单相关代码到独立服务"},
    
    # 阶段2:API 设计(并行)
    {"agent": "api-designer", "task": "设计服务间通信 API", "parallel": True},
    
    # 阶段3:数据迁移
    {"agent": "data-migrator", "task": "拆分数据库表到各自服务"},
    
    # 阶段4:集成测试
    {"agent": "tester", "task": "端到端测试所有服务"}
])
 
# 执行并监控
result = team.execute(workflow)
for event in result.stream():
    print(f"[{event.agent}] {event.status}: {event.message}")

实战:大规模重构

场景:将单体应用拆分为微服务

name: monolith-to-microservices
 
team:
  - name: analyzer
    role: 分析单体应用边界
  - name: service-creator
    role: 创建微服务骨架
  - name: api-gateway-dev
    role: 开发 API 网关
  - name: data-migrator
    role: 设计数据迁移方案
  - name: tester
    role: 验证拆分后功能
 
workflow:
  phase-1-analysis:
    - analyzer:
        task: 识别领域边界和拆分点
        output: bounded_contexts.json
 
  phase-2-parallel-implementation:
    parallel:
      - service-creator:
          task: 创建用户服务
          input: bounded_contexts.json
          context: user-service
      - service-creator:
          task: 创建订单服务
          input: bounded_contexts.json
          context: order-service
      - api-gateway-dev:
          task: 开发路由和认证
          depends: [user-service, order-service]
 
  phase-3-migration:
    - data-migrator:
        task: 执行数据迁移
        depends: phase-2
 
  phase-4-verification:
    - tester:
        task: 端到端测试
        depends: phase-3

最佳实践

✅ 推荐❌ 避免
明确每个代理的职责范围代理职责重叠
定义清晰的完成标准模糊的 "做好"
设置合理的超时无限期等待
保留人工确认点完全无人干预
记录完整的执行日志丢失审计追踪

下一步

05. 企业部署:SSO、审计、合规

分享

评论

登录 后参与讨论。

加载中…

相关文章