目标:使用 Claude Agent SDK 开发自定义 AI Agent,将 Claude 能力集成到自己的应用中
预计时间:45 分钟
对应官方文档:Agent SDK Overview、Python Reference、TypeScript Reference
什么是 Agent SDK?
Agent SDK 让你以编程方式控制 Claude Code:
- 🐍 Python SDK
- 📘 TypeScript SDK
- ⚡ 直接调用底层 API
使用场景:
- 构建内部开发工具平台
- 自动化代码审查流水线
- 创建自定义 IDE 插件
- 批量代码迁移工具
快速开始(Python)
安装
pip install anthropic-agent-sdkHello World
from claude_agent_sdk import Agent
# 创建 Agent
agent = Agent(
model="claude-sonnet-4-6",
project_dir="./my-project"
)
# 运行任务
result = agent.run("重构 auth.py,使用 JWT 替代 session")
print(result.summary)
print(result.files_modified)流式输出
# 实时看到思考过程
for event in agent.run_stream("优化数据库查询"):
if event.type == "thinking":
print(f"🤔 {event.content}")
elif event.type == "tool_call":
print(f"🔧 调用: {event.tool_name}")
elif event.type == "text":
print(f"💬 {event.content}")TypeScript SDK 快速开始
除了 Python SDK,Claude 还提供官方 TypeScript SDK,适合前端和 Node.js 项目集成。
安装
npm install @anthropic/agent-sdkHello World
import { Agent } from '@anthropic/agent-sdk';
const agent = new Agent({
model: 'claude-sonnet-4-6',
projectDir: './my-project',
apiKey: process.env.ANTHROPIC_API_KEY
});
async function main() {
const result = await agent.run('重构 auth.ts,使用 JWT 替代 session');
console.log(result.summary);
console.log(result.filesModified);
}
main().catch(console.error);流式输出(TypeScript)
const stream = agent.runStream('优化数据库查询');
for await (const event of stream) {
switch (event.type) {
case 'thinking':
process.stdout.write(`🤔 ${event.content}`);
break;
case 'tool_call':
console.log(`🔧 调用: ${event.toolName}`);
break;
case 'text':
process.stdout.write(`💬 ${event.content}`);
break;
case 'error':
console.error(`❌ 错误: ${event.error}`);
break;
}
}与 Express 集成
import express from 'express';
import { Agent } from '@anthropic/agent-sdk';
const app = express();
app.use(express.json());
const agent = new Agent({
model: 'claude-sonnet-4-6',
projectDir: '/var/projects/backend'
});
app.post('/api/review', async (req, res) => {
const { filePath, focus } = req.body;
try {
const result = await agent.run(
`审查文件 ${filePath},重点关注: ${focus || '安全性与性能'}`
);
res.json({
success: true,
summary: result.summary,
suggestions: result.suggestions,
issues: result.issues || []
});
} catch (error) {
res.status(500).json({ success: false, error: error.message });
}
});
app.listen(3000, () => console.log('Review API on port 3000'));核心概念
Agent Loop(代理循环)
```mermaid
flowchart TD
A[用户输入任务] --> B{Agent Loop}
B --> C[AI 思考<br/>生成工具调用计划]
C --> D{需要工具?}
D -->|是| E[执行工具<br/>读文件 / 运行命令]
E --> F[观察执行结果]
F --> C
D -->|否| G[生成最终回答]
G --> H{循环次数 < max?}
H -->|是| B
H -->|否| I[返回结果]
B -.->|超时检测| J[中断并返回部分结果]
style C fill:#e1f5fe
style E fill:#fff3e0
style I fill:#e8f5e9
```python
from claude_agent_sdk import AgentLoop
loop = AgentLoop(
agent=agent,
max_iterations=50, # 最大循环次数
timeout=300 # 超时时间(秒)
)
result = loop.run("实现用户注册功能")
Session 管理
# 创建会话
session = agent.create_session("feature-auth")
# 多轮对话
session.send("第一步:创建用户模型")
session.send("第二步:添加 API 端点")
session.send("第三步:写测试")
# 保存会话状态
session.save("./sessions/auth.pkl")
# 恢复会话
session = agent.load_session("./sessions/auth.pkl")工具系统
内置工具
from claude_agent_sdk.tools import FileRead, Bash, Git
agent = Agent(
tools=[
FileRead(), # 读取文件
Bash(allowed=["pytest", "black", "git"]), # 受限的 Bash
Git() # Git 操作
]
)自定义工具
from claude_agent_sdk import tool
@tool
def deploy_to_staging(branch: str) -> str:
"""部署指定分支到预发布环境"""
import subprocess
result = subprocess.run(
["./scripts/deploy.sh", branch, "staging"],
capture_output=True, text=True
)
return result.stdout
agent = Agent(tools=[deploy_to_staging])
# AI 会自动调用
task = "修复 bug 后部署到预发布环境验证"
result = agent.run(task)底层 API 直接调用
对于需要极致控制权的场景,可以直接调用 Claude Code 底层 REST API,绕过 SDK 的高级封装。
认证与基础调用
import requests
import os
API_BASE = "https://api.claude.code/v1"
API_KEY = os.environ["ANTHROPIC_API_KEY"]
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
"X-Client-Version": "1.0.0"
}
def create_session(project_dir: str) -> str:
"""创建新会话并返回 session_id"""
resp = requests.post(
f"{API_BASE}/sessions",
headers=headers,
json={"project_dir": project_dir, "context": "api-direct"}
)
resp.raise_for_status()
return resp.json()["session_id"]
def run_task(session_id: str, task: str, tools: list = None) -> dict:
"""在会话中执行任务"""
payload = {
"task": task,
"model": "claude-sonnet-4-6",
"max_iterations": 50,
"tools": tools or ["file_read", "bash", "git"],
"stream": False
}
resp = requests.post(
f"{API_BASE}/sessions/{session_id}/run",
headers=headers,
json=payload,
timeout=300
)
resp.raise_for_status()
return resp.json()
def stream_task(session_id: str, task: str):
"""流式执行任务"""
payload = {
"task": task,
"model": "claude-sonnet-4-6",
"stream": True
}
with requests.post(
f"{API_BASE}/sessions/{session_id}/run",
headers=headers,
json=payload,
stream=True,
timeout=300
) as resp:
resp.raise_for_status()
for line in resp.iter_lines():
if line:
yield line.decode("utf-8")
# 使用示例
if __name__ == "__main__":
sid = create_session("./my-project")
result = run_task(sid, "列出 src/ 下所有 Python 文件并分析复杂度")
print(f"状态: {result['status']}")
print(f"摘要: {result['summary']}")WebSocket 实时通信
import asyncio
import websockets
import json
async def realtime_agent(project_dir: str, task: str):
"""通过 WebSocket 与 Agent 实时双向通信"""
uri = f"wss://api.claude.code/v1/sessions/ws?token={API_KEY}"
async with websockets.connect(uri) as ws:
# 初始化会话
await ws.send(json.dumps({
"type": "init",
"project_dir": project_dir,
"model": "claude-sonnet-4-6"
}))
# 发送任务
await ws.send(json.dumps({
"type": "run",
"task": task
}))
# 实时接收事件
async for message in ws:
event = json.loads(message)
if event["type"] == "thinking":
print(f"🤔 {event['content']}")
elif event["type"] == "tool_call":
print(f"🔧 工具: {event['tool']}({event['args']})")
elif event["type"] == "permission_request":
# 人工介入确认
print(f"⚠️ 需要权限: {event['description']}")
choice = input("允许? (y/n): ")
await ws.send(json.dumps({
"type": "permission_response",
"id": event["id"],
"approved": choice.lower() == "y"
}))
elif event["type"] == "complete":
print(f"✅ 完成: {event['summary']}")
break
# asyncio.run(realtime_agent("./project", "实现登录功能"))权限控制
声明式权限
from claude_agent_sdk import PermissionMode
agent = Agent(
permission_mode=PermissionMode.ASK, # 每次询问
# 或细粒度控制
permissions={
"file:write": "ask", # 写文件要确认
"file:read": "allow", # 读文件自动
"bash:run": {
"pytest *": "allow", # 测试自动
"rm *": "deny", # 删除禁止
"*": "ask" # 其他询问
}
}
)Hooks 拦截
# 在关键节点插入自定义逻辑
@agent.hook("before_tool_call")
def log_tool_call(tool_name, args):
logger.info(f"AI 调用工具: {tool_name}")
@agent.hook("after_file_write")
def notify_team(file_path):
slack.send(f"AI 修改了: {file_path}")生产部署
Docker 部署
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
ENV ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
CMD ["python", "agent_service.py"]Kubernetes
apiVersion: apps/v1
kind: Deployment
metadata:
name: claude-agent
spec:
replicas: 3
template:
spec:
containers:
- name: agent
image: my-registry/claude-agent:v1
env:
- name: ANTHROPIC_API_KEY
valueFrom:
secretKeyRef:
name: anthropic-api
key: token
resources:
limits:
memory: "2Gi"
cpu: "1000m"可观测性
from claude_agent_sdk import OpenTelemetryConfig
agent = Agent(
telemetry=OpenTelemetryConfig(
exporter="otlp",
endpoint="http://jaeger:4317",
service_name="code-review-agent"
)
)完整示例:自动化代码审查
#!/usr/bin/env python3
"""自动化代码审查 Agent"""
import asyncio
from claude_agent_sdk import Agent, AgentLoop
from claude_agent_sdk.tools import FileRead, Bash, Git
class CodeReviewAgent:
def __init__(self, project_dir: str):
self.agent = Agent(
model="claude-sonnet-4-6",
project_dir=project_dir,
tools=[
FileRead(),
Bash(allowed=["pytest", "black", "flake8", "git"]),
Git()
],
permissions={
"file:write": "deny", # 只读审查
"file:read": "allow",
"bash:run": {
"pytest *": "allow",
"black --check *": "allow",
"flake8 *": "allow",
}
}
)
async def review_pr(self, branch: str) -> dict:
"""审查 PR 并返回报告"""
# 获取变更文件
diff = await self.agent.run(
f"获取 {branch} 分支的变更文件列表"
)
# 并行审查
tasks = []
for file in diff.changed_files:
task = self.agent.run(
f"审查文件 {file},关注:\n"
"1. 安全性(SQL注入、XSS等)\n"
"2. 性能问题\n"
"3. 代码风格\n"
"4. 测试覆盖"
)
tasks.append(task)
results = await asyncio.gather(*tasks)
# 汇总报告
report = {
"summary": self._summarize(results),
"issues": [r.issues for r in results],
"suggestions": [r.suggestions for r in results]
}
return report
def _summarize(self, results):
total_issues = sum(len(r.issues) for r in results)
return f"发现 {total_issues} 个问题,{len(results)} 个文件已审查"
# 使用
async def main():
reviewer = CodeReviewAgent("./my-project")
report = await reviewer.review_pr("feature/new-auth")
print(report)
if __name__ == "__main__":
asyncio.run(main())企业级实战场景
场景一:内部代码审查平台(FastAPI 微服务)
某 500 人研发团队需要统一代码审查标准,将 Agent SDK 封装为内部服务,供 GitHub/GitLab Webhook 调用。
# review_service.py - 生产级封装
import asyncio
import hashlib
import hmac
import os
from datetime import datetime
from typing import Optional
from fastapi import FastAPI, Header, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from claude_agent_sdk import Agent, PermissionMode
app = FastAPI(title="Claude Code Review Service", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["https://github.company.com", "https://gitlab.company.com"],
allow_methods=["POST"],
)
# 配置
GITHUB_SECRET = os.environ["GITHUB_WEBHOOK_SECRET"]
REVIEW_CONFIG = {
"model": "claude-sonnet-4-6",
"max_files_per_review": 20,
"max_lines_per_file": 500,
"focus_areas": ["security", "performance", "maintainability"]
}
class ReviewJob(BaseModel):
repo: str
pr_number: int
base_sha: str
head_sha: str
changed_files: list[str]
author: str
team: Optional[str] = None
class ReviewService:
def __init__(self):
self.agent = Agent(
model=REVIEW_CONFIG["model"],
permission_mode=PermissionMode.ASK,
permissions={
"file:write": "deny", # 只读审查,禁止修改
"file:read": "allow",
"bash:run": {
"git *": "allow",
"pytest *": "allow",
"*": "deny"
}
}
)
self.active_jobs = {}
async def review_pr(self, job: ReviewJob) -> dict:
"""执行 PR 审查"""
job_id = hashlib.sha256(
f"{job.repo}:{job.pr_number}:{datetime.utcnow().isoformat()}".encode()
).hexdigest()[:12]
self.active_jobs[job_id] = {"status": "running", "progress": 0}
try:
# 1. 获取 diff
diff_result = await self.agent.run(
f"在 {job.repo} 中比较 {job.base_sha}..{job.head_sha} 的变更,"
f"列出前 {REVIEW_CONFIG['max_files_per_review']} 个变更文件"
)
# 2. 按文件并行审查
review_tasks = []
for file in diff_result.changed_files[:REVIEW_CONFIG["max_files_per_review"]]:
task = self._review_single_file(file, job)
review_tasks.append(task)
file_reviews = await asyncio.gather(*review_tasks, return_exceptions=True)
# 3. 汇总报告
summary = await self.agent.run(
"汇总以下各文件审查结果,生成统一 PR 审查报告:\n"
+ "\n".join(str(r) for r in file_reviews if not isinstance(r, Exception))
)
report = {
"job_id": job_id,
"repo": job.repo,
"pr_number": job.pr_number,
"summary": summary.content,
"file_reviews": [
r for r in file_reviews if not isinstance(r, Exception)
],
"completed_at": datetime.utcnow().isoformat()
}
self.active_jobs[job_id] = {"status": "completed", "report": report}
return report
except Exception as e:
self.active_jobs[job_id] = {"status": "failed", "error": str(e)}
raise
async def _review_single_file(self, file_path: str, job: ReviewJob) -> dict:
"""审查单个文件"""
focus = ", ".join(REVIEW_CONFIG["focus_areas"])
result = await self.agent.run(
f"审查文件 {file_path},关注: {focus}\n"
"对每个发现的问题,给出:严重程度、行号、修复建议。"
)
return {
"file": file_path,
"issues": result.issues or [],
"suggestions": result.suggestions or []
}
service = ReviewService()
@app.post("/webhook/github")
async def github_webhook(
request: Request,
x_hub_signature_256: str = Header(None)
):
"""接收 GitHub PR Webhook"""
body = await request.body()
# 验证签名
expected = "sha256=" + hmac.new(
GITHUB_SECRET.encode(), body, hashlib.sha256
).hexdigest()
if not hmac.compare_digest(expected, x_hub_signature_256 or ""):
raise HTTPException(401, "Invalid signature")
payload = await request.json()
if payload.get("action") not in ["opened", "synchronize"]:
return {"ignored": True}
pr = payload["pull_request"]
job = ReviewJob(
repo=payload["repository"]["full_name"],
pr_number=pr["number"],
base_sha=pr["base"]["sha"],
head_sha=pr["head"]["sha"],
changed_files=[f["filename"] for f in pr.get("files", [])],
author=pr["user"]["login"]
)
# 后台执行审查
asyncio.create_task(service.review_pr(job))
return {"accepted": True, "message": "Review job queued"}
@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str):
"""查询审查任务状态"""
return service.active_jobs.get(job_id, {"status": "not_found"})
# 部署: uvicorn review_service:app --host 0.0.0.0 --port 8080场景二:批量代码迁移工具(Python 2→3 跨仓库)
企业维护 200+ 遗留 Python 2 仓库,需要批量、可控地完成迁移。
# batch_migration.py
import asyncio
import json
import logging
import os
from dataclasses import dataclass
from pathlib import Path
from claude_agent_sdk import Agent, AgentLoop, PermissionMode
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("migration")
@dataclass
class RepoConfig:
name: str
clone_url: str
default_branch: str
criticality: str # high / medium / low
test_command: str = "pytest tests/ -x -q"
MIGRATION_REPOS = [
RepoConfig("billing-service", "[email protected]:company/billing.git", "main", "high", "pytest"),
RepoConfig("user-api", "[email protected]:company/user-api.git", "master", "high", "pytest"),
RepoConfig("report-generator", "[email protected]:company/reports.git", "main", "medium", "python -m unittest discover"),
]
class BatchMigration:
def __init__(self, work_dir: str = "/tmp/migration"):
self.work_dir = Path(work_dir)
self.work_dir.mkdir(parents=True, exist_ok=True)
self.results = []
# 为迁移任务配置专用 Agent(允许写文件、运行 git)
self.agent = Agent(
model="claude-opus",
permission_mode=PermissionMode.ASK,
permissions={
"file:write": "allow", # 迁移需要修改代码
"file:read": "allow",
"bash:run": {
"git *": "allow",
"2to3 *": "allow",
"pytest *": "allow",
"python *": "allow",
"pip *": "allow",
"rm -rf /": "deny",
"*": "ask"
}
}
)
async def migrate_repo(self, repo: RepoConfig) -> dict:
"""迁移单个仓库"""
repo_dir = self.work_dir / repo.name
result = {"repo": repo.name, "status": "pending", "phases": []}
try:
# Phase 1: 克隆仓库
logger.info(f"[{repo.name}] Phase 1: Clone")
await self._run_bash(f"git clone {repo.clone_url} {repo_dir}")
result["phases"].append({"phase": "clone", "status": "ok"})
# Phase 2: 分析(只读)
logger.info(f"[{repo.name}] Phase 2: Analysis")
analysis = await self.agent.run(
f"在 {repo_dir} 中扫描 Python 2 兼容性问题,"
"输出 JSON 格式的详细问题清单(含文件、行号、建议修复)",
project_dir=str(repo_dir)
)
result["phases"].append({"phase": "analysis", "status": "ok", "details": analysis.summary})
# Phase 3: 自动修复
logger.info(f"[{repo.name}] Phase 3: Auto-fix")
branch_name = f"auto/migrate-py3-{os.urandom(4).hex()}"
await self._run_bash(
f"cd {repo_dir} && git checkout -b {branch_name}",
project_dir=str(repo_dir)
)
fix_result = await self.agent.run(
f"在 {repo_dir} 中自动修复所有 Python 2→3 兼容性问题,"
"包括:print 语句、xrange、unicode 处理、除法运算、异常语法、导入调整。"
"修复后运行 2to3 确认无遗漏。",
project_dir=str(repo_dir)
)
result["phases"].append({"phase": "auto-fix", "status": "ok", "files_modified": fix_result.files_modified})
# Phase 4: 测试验证
logger.info(f"[{repo.name}] Phase 4: Verification")
test_result = await self._run_bash(
f"cd {repo_dir} && {repo.test_command}",
project_dir=str(repo_dir)
)
tests_passed = test_result.returncode == 0
result["phases"].append({"phase": "test", "status": "pass" if tests_passed else "fail"})
# Phase 5: 提交并推送
if tests_passed:
await self._run_bash(
f"cd {repo_dir} && git add -A && "
f'git commit -m "chore: auto-migrate Python 2 to 3" && '
f"git push origin {branch_name}",
project_dir=str(repo_dir)
)
result["status"] = "success"
result["branch"] = branch_name
else:
result["status"] = "needs_manual_review"
except Exception as e:
logger.exception(f"[{repo.name}] Migration failed")
result["status"] = "error"
result["error"] = str(e)
self.results.append(result)
return result
async def _run_bash(self, command: str, project_dir: str = None):
"""辅助:运行 bash 命令"""
# 实际使用 Agent 的 bash 工具或 subprocess
import subprocess
return subprocess.run(command, shell=True, capture_output=True, text=True, cwd=project_dir)
async def run_all(self, max_parallel: int = 3):
"""批量执行迁移,控制并发数"""
semaphore = asyncio.Semaphore(max_parallel)
async def limited_migrate(repo):
async with semaphore:
return await self.migrate_repo(repo)
tasks = [limited_migrate(repo) for repo in MIGRATION_REPOS]
await asyncio.gather(*tasks)
# 生成汇总报告
report_path = self.work_dir / "migration-report.json"
report_path.write_text(json.dumps(self.results, indent=2, ensure_ascii=False))
logger.info(f"Report saved to {report_path}")
success = sum(1 for r in self.results if r["status"] == "success")
logger.info(f"Migration complete: {success}/{len(MIGRATION_REPOS)} succeeded")
# 执行
if __name__ == "__main__":
migrator = BatchMigration()
asyncio.run(migrator.run_all(max_parallel=2))