目标:使用动态工作流编排多个子代理,处理大规模复杂任务
预计时间:40 分钟
对应官方文档:Workflows、Agent Teams、Agent View
什么是动态工作流?
动态工作流(Dynamic Workflows) 允许你:
- 定义多步骤任务流程
- 自动编排子代理执行
- 处理依赖关系和并行执行
- 收集和汇总结果
适用场景:
- 大规模代码库重构
- 跨模块功能开发
- 自动化代码审查
- 批量数据处理
工作流架构
动态工作流执行流程
代理团队协作时序
工作流定义
YAML 格式
# workflow.yaml
name: codebase-modernization
description: 将 Python 2 代码迁移到 Python 3
steps:
# 步骤 1:分析阶段
- id: analyze
name: 分析代码库
agent: analyzer
task: |
扫描整个代码库,识别:
1. Python 2 特有语法
2. 废弃的库引用
3. 字符串/字节处理问题
4. 打印语句
output: analysis_report.json
# 步骤 2:并行修复(基于分析结果)
- id: fix-syntax
name: 修复语法问题
agent: refactor-agent
depends: [analyze]
input: analysis_report.json
filter: "category == 'syntax'"
task: "修复所有 Python 2 语法问题"
- id: fix-imports
name: 更新导入语句
agent: refactor-agent
depends: [analyze]
input: analysis_report.json
filter: "category == 'imports'"
task: "更新所有废弃的库引用"
- id: fix-strings
name: 修复字符串处理
agent: refactor-agent
depends: [analyze]
input: analysis_report.json
filter: "category == 'strings'"
task: "修复字符串/字节处理问题"
# 步骤 3:合并结果
- id: integrate
name: 合并修改
agent: integrator
depends: [fix-syntax, fix-imports, fix-strings]
task: |
合并所有子代理的修改,解决冲突:
1. 检查是否有重叠修改
2. 解决 git 冲突
3. 确保代码可运行
# 步骤 4:验证
- id: verify
name: 验证迁移结果
agent: tester
depends: [integrate]
task: |
全面验证:
1. 运行 2to3 检查无遗漏
2. 执行完整测试套件
3. 性能基准测试
4. 生成迁移报告
output: verification_report.json完整的 Agent SDK 工作流示例
#!/usr/bin/env python3
"""大规模代码重构工作流示例"""
import asyncio
from dataclasses import dataclass
from typing import List, Dict
from claude_agent_sdk import Agent, AgentLoop, Workflow
@dataclass
class RefactorTask:
module: str
files: List[str]
dependencies: List[str]
status: str = "pending" # pending / running / done / failed
class CodebaseRefactorWorkflow:
def __init__(self, project_dir: str):
self.project_dir = project_dir
self.agents = {}
self.tasks: Dict[str, RefactorTask] = {}
async def setup_agents(self):
"""初始化各角色代理"""
self.agents["analyzer"] = Agent(
model="claude-opus",
name="code-analyzer",
instructions="""你是代码分析专家。你的任务是:
1. 扫描代码库,识别技术债务
2. 找出重复代码和过时的模式
3. 制定安全的重构计划
4. 标记需要特别注意的边界情况
输出格式必须是 JSON:
{
"issues": [{"file": "", "line": 0, "type": "", "severity": ""}],
"refactor_plan": [{"module": "", "files": [], "strategy": ""}]
}"""
)
self.agents["refactor"] = Agent(
model="claude-sonnet",
name="refactor-dev",
instructions="""你是重构专家。你的任务是:
1. 按重构计划执行修改
2. 保持向后兼容
3. 每次修改后运行测试
4. 如果测试失败,回滚并报告
约束:
- 不要一次修改超过 5 个文件
- 所有修改必须有对应的测试
- 不要改变 API 接口签名"""
)
self.agents["tester"] = Agent(
model="claude-sonnet",
name="test-expert",
instructions="""你是测试专家。你的任务是:
1. 为重构后的代码补充测试
2. 运行全量回归测试
3. 检查测试覆盖率不下降
4. 生成测试报告"""
)
async def analyze(self) -> Dict:
"""第一阶段:代码分析"""
print("🔍 阶段 1/4: 分析代码库...")
analyzer = self.agents["analyzer"]
result = await analyzer.run(
f"分析项目 {self.project_dir} 的代码质量,"
f"识别需要重构的模块和具体文件。"
f"列出所有发现的问题并按严重程度排序。"
)
# 解析分析结果
analysis = result.parsed_output
for plan in analysis.get("refactor_plan", []):
self.tasks[plan["module"]] = RefactorTask(
module=plan["module"],
files=plan["files"],
dependencies=plan.get("dependencies", [])
)
return analysis
async def parallel_refactor(self):
"""第二阶段:并行重构"""
print("🔧 阶段 2/4: 并行重构...")
# 按依赖拓扑排序,无依赖的先执行
ready_tasks = [t for t in self.tasks.values() if not t.dependencies]
async def refactor_module(task: RefactorTask):
task.status = "running"
agent = Agent(
model="claude-sonnet",
name=f"refactor-{task.module}",
project_dir=self.project_dir
)
result = await agent.run(
f"重构模块 {task.module},涉及文件: {', '.join(task.files)}\n"
f"要求:\n"
f"1. 保持所有现有测试通过\n"
f"2. 添加新功能的单元测试\n"
f"3. 更新相关文档\n"
f"4. 遵循项目的编码规范"
)
task.status = "done" if result.success else "failed"
return result
# 并行执行所有就绪任务
results = await asyncio.gather(*[
refactor_module(t) for t in ready_tasks
])
return results
async def integrate(self):
"""第三阶段:合并与冲突解决"""
print("🔄 阶段 3/4: 合并修改...")
integrator = Agent(
model="claude-opus",
name="integrator"
)
result = await integrator.run(
f"检查 {self.project_dir} 的所有修改,\n"
f"解决可能的冲突,确保代码能正常编译和运行。\n"
f"如果有冲突无法自动解决,列出需要人工介入的文件。"
)
return result
async def verify(self) -> bool:
"""第四阶段:验证"""
print("✅ 阶段 4/4: 验证结果...")
tester = self.agents["tester"]
result = await tester.run(
f"在 {self.project_dir} 运行完整测试套件,\n"
f"包括:单元测试、集成测试、类型检查、代码风格检查。\n"
f"生成详细的测试报告。"
)
return result.success
async def run(self):
"""运行完整工作流"""
await self.setup_agents()
try:
# 阶段 1: 分析
analysis = await self.analyze()
if not self.tasks:
print("✅ 代码库状态良好,无需重构")
return {"status": "success", "message": "无需重构"}
# 阶段 2: 并行重构
refactor_results = await self.parallel_refactor()
# 检查是否有失败
failed = [r for r in refactor_results if not r.success]
if failed:
print(f"⚠️ {len(failed)} 个模块重构失败,需要人工介入")
return {"status": "partial", "failed": failed}
# 阶段 3: 合并
await self.integrate()
# 阶段 4: 验证
if await self.verify():
print("🎉 重构完成!所有测试通过")
return {"status": "success", "tasks": self.tasks}
else:
print("❌ 测试失败,请查看报告")
return {"status": "failed", "stage": "verify"}
except Exception as e:
print(f"💥 工作流异常: {e}")
return {"status": "error", "message": str(e)}
# 使用示例
async def main():
workflow = CodebaseRefactorWorkflow("./my-project")
result = await workflow.run()
print(f"最终结果: {result}")
if __name__ == "__main__":
asyncio.run(main())执行工作流
命令行
claude workflow run workflow.yamlAgent SDK
from claude_agent_sdk import Workflow
workflow = Workflow.from_file("workflow.yaml")
result = workflow.run(
project_dir="./legacy-project",
agents={
"analyzer": analyzer_agent,
"refactor-agent": refactor_agent,
"integrator": integrator_agent,
"tester": tester_agent
}
)
print(result.status) # success / partial / failed
print(result.reports)Agent Teams(代理团队)
概念
Agent Team = 多个 Claude Code 实例协同工作:
Team: "后端开发组"
├── Agent-1: 数据库专家
├── Agent-2: API 开发者
├── Agent-3: 测试工程师
└── Agent-4: 代码审查员
创建团队
from claude_agent_sdk import AgentTeam
team = AgentTeam(
name="backend-dev",
description="后端功能开发团队"
)
# 添加专家代理
team.add_agent(
name="db-expert",
model="claude-opus",
instructions="数据库设计和优化专家",
skills=["sql", "migration", "indexing"]
)
team.add_agent(
name="api-dev",
model="claude-sonnet",
instructions="REST API 开发专家",
skills=["fastapi", "openapi", "validation"]
)
team.add_agent(
name="tester",
model="claude-sonnet",
instructions="测试覆盖专家",
skills=["pytest", "mock", "coverage"]
)分配任务
# 创建功能开发任务
task = team.create_task(
name="实现用户订单系统",
description=""")
# 自动分配子任务
assignment = team.assign(task)
# db-expert -> 设计订单表
# api-dev -> 实现订单 API
# tester -> 编写测试用例
# 监控进度
for update in team.track(task):
print(f"{update.agent}: {update.status} - {update.progress}%")Agent View(代理视图)
功能
在桌面应用或 Web 中,Agent View 提供:
- 所有活跃会话的仪表盘
- 实时查看每个代理的工作状态
- 集中处理需要人类确认的请求
使用
# 启动 Agent View
claude agent-view
# 或创建带监控的代理
claude --agent-view --name "batch-refactor"监控面板
┌─────────────────────────────────────────────┐
│ Agent View - 5 Active Sessions │
├──────────────┬────────┬──────────┬──────────┤
│ Name │ Status │ Progress │ Action │
├──────────────┼────────┼──────────┼──────────┤
│ auth-refactor│ 🟢 Run │ 65% │ [查看] │
│ api-update │ ⏸️ Wait│ - │ [确认] │
│ test-gen │ 🟢 Run │ 40% │ [查看] │
│ doc-update │ ✅ Done│ 100% │ [报告] │
│ security-scan│ 🟢 Run │ 20% │ [查看] │
└──────────────┴────────┴──────────┴──────────┘
实战场景
场景 1:遗留系统现代化
背景: 一个 5 年历史的 Django 2.0 项目,需要升级到 Django 5.0,同时迁移到 async ORM。
工作流配置:
# django-upgrade.yaml
name: django-2-to-5-upgrade
description: Django 2.0 → 5.0 升级工作流
phases:
1-analysis:
steps:
- agent: code-archaeologist
task: |
扫描整个 Django 项目:
1. 列出所有 Django 2.0 特有的 API 用法
2. 识别不兼容的第三方包
3. 标记所有同步视图需要 async 化的地方
4. 检查数据库迁移兼容性
output: django-upgrade-report.json
2-preparation:
steps:
- agent: dependency-manager
task: |
基于分析报告:
1. 更新 requirements.txt 到兼容版本
2. 替换废弃的包(如 django-celery → celery)
3. 添加 django-upgrade 工具到 CI
depends: [1-analysis]
3-parallel-migration:
parallel:
- agent: async-migrator
task: |
将所有同步视图改为 async:
- views.py 中的函数视图 → async def
- ORM 查询添加 async 支持
- 中间件适配 async
depends: [2-preparation]
- agent: model-updater
task: |
更新模型层:
- 替换废弃的 Field 类型
- 更新 Meta 选项
- 重新生成迁移文件
depends: [2-preparation]
- agent: template-modernizer
task: |
更新模板层:
- 替换废弃的模板标签
- 更新静态文件引用方式
depends: [2-preparation]
4-integration:
steps:
- agent: integrator
task: |
合并所有修改:
1. 解决文件冲突
2. 运行 django-upgrade --check
3. 生成最终迁移计划
depends: [3-parallel-migration]
5-verification:
steps:
- agent: qa-engineer
task: |
全面验证:
1. python manage.py check --deploy
2. python manage.py test --parallel
3. 运行 security check (bandit, safety)
4. 性能基准测试(wrk/ab)
depends: [4-integration]执行:
# 创建工作树隔离
git worktree add ../django-upgrade-workspace
cd ../django-upgrade-workspace
# 启动工作流
claude workflow run django-upgrade.yaml \
--agent-view \
--timeout 2h \
--notify slack:#dev-channel预期结果:
- 自动完成 80%+ 的机械性修改
- 人工只需处理业务逻辑相关的复杂变更
- 升级时间从 2 周缩短到 2 天
场景 2:微服务拆分
背景: 单体电商应用(20万行代码)需要拆分为:用户服务、订单服务、库存服务、支付服务。
执行策略:
# microservice-split.py
from claude_agent_sdk import AgentTeam, Workflow
team = AgentTeam(name="microservice-migration")
# 添加领域专家代理
team.add_agent("user-domain-expert",
instructions="用户领域专家:理解用户认证、权限、Profile 等业务逻辑")
team.add_agent("order-domain-expert",
instructions="订单领域专家:理解订单状态机、优惠计算、拆单逻辑")
team.add_agent("api-designer",
instructions="API 设计专家:设计 RESTful API,确保向后兼容")
# 定义工作流
workflow = Workflow([
# 阶段1:领域分析
{"agent": "user-domain-expert", "task": "提取用户相关代码到独立服务"},
{"agent": "order-domain-expert", "task": "提取订单相关代码到独立服务"},
# 阶段2:API 设计(并行)
{"agent": "api-designer", "task": "设计服务间通信 API", "parallel": True},
# 阶段3:数据迁移
{"agent": "data-migrator", "task": "拆分数据库表到各自服务"},
# 阶段4:集成测试
{"agent": "tester", "task": "端到端测试所有服务"}
])
# 执行并监控
result = team.execute(workflow)
for event in result.stream():
print(f"[{event.agent}] {event.status}: {event.message}")实战:大规模重构
场景:将单体应用拆分为微服务
name: monolith-to-microservices
team:
- name: analyzer
role: 分析单体应用边界
- name: service-creator
role: 创建微服务骨架
- name: api-gateway-dev
role: 开发 API 网关
- name: data-migrator
role: 设计数据迁移方案
- name: tester
role: 验证拆分后功能
workflow:
phase-1-analysis:
- analyzer:
task: 识别领域边界和拆分点
output: bounded_contexts.json
phase-2-parallel-implementation:
parallel:
- service-creator:
task: 创建用户服务
input: bounded_contexts.json
context: user-service
- service-creator:
task: 创建订单服务
input: bounded_contexts.json
context: order-service
- api-gateway-dev:
task: 开发路由和认证
depends: [user-service, order-service]
phase-3-migration:
- data-migrator:
task: 执行数据迁移
depends: phase-2
phase-4-verification:
- tester:
task: 端到端测试
depends: phase-3最佳实践
| ✅ 推荐 | ❌ 避免 |
|---|---|
| 明确每个代理的职责范围 | 代理职责重叠 |
| 定义清晰的完成标准 | 模糊的 "做好" |
| 设置合理的超时 | 无限期等待 |
| 保留人工确认点 | 完全无人干预 |
| 记录完整的执行日志 | 丢失审计追踪 |