Sooua
登录
返回文章列表
Claude Code··12 分钟阅读

Agent SDK:将 Claude Code 嵌入应用

Agent SDK 让你以编程方式控制 Claude Code:

目标:使用 Claude Agent SDK 开发自定义 AI Agent,将 Claude 能力集成到自己的应用中
预计时间:45 分钟
对应官方文档:Agent SDK OverviewPython ReferenceTypeScript Reference


什么是 Agent SDK?

Agent SDK 让你以编程方式控制 Claude Code:

  • 🐍 Python SDK
  • 📘 TypeScript SDK
  • ⚡ 直接调用底层 API

使用场景:

  • 构建内部开发工具平台
  • 自动化代码审查流水线
  • 创建自定义 IDE 插件
  • 批量代码迁移工具

快速开始(Python)

安装

pip install anthropic-agent-sdk

Hello World

from claude_agent_sdk import Agent
 
# 创建 Agent
agent = Agent(
    model="claude-sonnet-4-6",
    project_dir="./my-project"
)
 
# 运行任务
result = agent.run("重构 auth.py,使用 JWT 替代 session")
 
print(result.summary)
print(result.files_modified)

流式输出

# 实时看到思考过程
for event in agent.run_stream("优化数据库查询"):
    if event.type == "thinking":
        print(f"🤔 {event.content}")
    elif event.type == "tool_call":
        print(f"🔧 调用: {event.tool_name}")
    elif event.type == "text":
        print(f"💬 {event.content}")

TypeScript SDK 快速开始

除了 Python SDK,Claude 还提供官方 TypeScript SDK,适合前端和 Node.js 项目集成。

安装

npm install @anthropic/agent-sdk

Hello World

import { Agent } from '@anthropic/agent-sdk';
 
const agent = new Agent({
  model: 'claude-sonnet-4-6',
  projectDir: './my-project',
  apiKey: process.env.ANTHROPIC_API_KEY
});
 
async function main() {
  const result = await agent.run('重构 auth.ts,使用 JWT 替代 session');
  console.log(result.summary);
  console.log(result.filesModified);
}
 
main().catch(console.error);

流式输出(TypeScript)

const stream = agent.runStream('优化数据库查询');
 
for await (const event of stream) {
  switch (event.type) {
    case 'thinking':
      process.stdout.write(`🤔 ${event.content}`);
      break;
    case 'tool_call':
      console.log(`🔧 调用: ${event.toolName}`);
      break;
    case 'text':
      process.stdout.write(`💬 ${event.content}`);
      break;
    case 'error':
      console.error(`❌ 错误: ${event.error}`);
      break;
  }
}

与 Express 集成

import express from 'express';
import { Agent } from '@anthropic/agent-sdk';
 
const app = express();
app.use(express.json());
 
const agent = new Agent({
  model: 'claude-sonnet-4-6',
  projectDir: '/var/projects/backend'
});
 
app.post('/api/review', async (req, res) => {
  const { filePath, focus } = req.body;
  
  try {
    const result = await agent.run(
      `审查文件 ${filePath},重点关注: ${focus || '安全性与性能'}`
    );
    
    res.json({
      success: true,
      summary: result.summary,
      suggestions: result.suggestions,
      issues: result.issues || []
    });
  } catch (error) {
    res.status(500).json({ success: false, error: error.message });
  }
});
 
app.listen(3000, () => console.log('Review API on port 3000'));

核心概念

Agent Loop(代理循环)

```mermaid
flowchart TD
    A[用户输入任务] --> B{Agent Loop}
    B --> C[AI 思考<br/>生成工具调用计划]
    C --> D{需要工具?}
    D -->|是| E[执行工具<br/>读文件 / 运行命令]
    E --> F[观察执行结果]
    F --> C
    D -->|否| G[生成最终回答]
    G --> H{循环次数 < max?}
    H -->|是| B
    H -->|否| I[返回结果]
    B -.->|超时检测| J[中断并返回部分结果]
    style C fill:#e1f5fe
    style E fill:#fff3e0
    style I fill:#e8f5e9

```python
from claude_agent_sdk import AgentLoop

loop = AgentLoop(
    agent=agent,
    max_iterations=50,      # 最大循环次数
    timeout=300             # 超时时间(秒)
)

result = loop.run("实现用户注册功能")

Session 管理

# 创建会话
session = agent.create_session("feature-auth")
 
# 多轮对话
session.send("第一步:创建用户模型")
session.send("第二步:添加 API 端点")
session.send("第三步:写测试")
 
# 保存会话状态
session.save("./sessions/auth.pkl")
 
# 恢复会话
session = agent.load_session("./sessions/auth.pkl")

工具系统

内置工具

from claude_agent_sdk.tools import FileRead, Bash, Git
 
agent = Agent(
    tools=[
        FileRead(),           # 读取文件
        Bash(allowed=["pytest", "black", "git"]),  # 受限的 Bash
        Git()                 # Git 操作
    ]
)

自定义工具

from claude_agent_sdk import tool
 
@tool
def deploy_to_staging(branch: str) -> str:
    """部署指定分支到预发布环境"""
    import subprocess
    result = subprocess.run(
        ["./scripts/deploy.sh", branch, "staging"],
        capture_output=True, text=True
    )
    return result.stdout
 
agent = Agent(tools=[deploy_to_staging])
 
# AI 会自动调用
task = "修复 bug 后部署到预发布环境验证"
result = agent.run(task)

底层 API 直接调用

对于需要极致控制权的场景,可以直接调用 Claude Code 底层 REST API,绕过 SDK 的高级封装。

认证与基础调用

import requests
import os
 
API_BASE = "https://api.claude.code/v1"
API_KEY = os.environ["ANTHROPIC_API_KEY"]
 
headers = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json",
    "X-Client-Version": "1.0.0"
}
 
def create_session(project_dir: str) -> str:
    """创建新会话并返回 session_id"""
    resp = requests.post(
        f"{API_BASE}/sessions",
        headers=headers,
        json={"project_dir": project_dir, "context": "api-direct"}
    )
    resp.raise_for_status()
    return resp.json()["session_id"]
 
def run_task(session_id: str, task: str, tools: list = None) -> dict:
    """在会话中执行任务"""
    payload = {
        "task": task,
        "model": "claude-sonnet-4-6",
        "max_iterations": 50,
        "tools": tools or ["file_read", "bash", "git"],
        "stream": False
    }
    resp = requests.post(
        f"{API_BASE}/sessions/{session_id}/run",
        headers=headers,
        json=payload,
        timeout=300
    )
    resp.raise_for_status()
    return resp.json()
 
def stream_task(session_id: str, task: str):
    """流式执行任务"""
    payload = {
        "task": task,
        "model": "claude-sonnet-4-6",
        "stream": True
    }
    with requests.post(
        f"{API_BASE}/sessions/{session_id}/run",
        headers=headers,
        json=payload,
        stream=True,
        timeout=300
    ) as resp:
        resp.raise_for_status()
        for line in resp.iter_lines():
            if line:
                yield line.decode("utf-8")
 
# 使用示例
if __name__ == "__main__":
    sid = create_session("./my-project")
    result = run_task(sid, "列出 src/ 下所有 Python 文件并分析复杂度")
    print(f"状态: {result['status']}")
    print(f"摘要: {result['summary']}")

WebSocket 实时通信

import asyncio
import websockets
import json
 
async def realtime_agent(project_dir: str, task: str):
    """通过 WebSocket 与 Agent 实时双向通信"""
    uri = f"wss://api.claude.code/v1/sessions/ws?token={API_KEY}"
    
    async with websockets.connect(uri) as ws:
        # 初始化会话
        await ws.send(json.dumps({
            "type": "init",
            "project_dir": project_dir,
            "model": "claude-sonnet-4-6"
        }))
        
        # 发送任务
        await ws.send(json.dumps({
            "type": "run",
            "task": task
        }))
        
        # 实时接收事件
        async for message in ws:
            event = json.loads(message)
            
            if event["type"] == "thinking":
                print(f"🤔 {event['content']}")
            elif event["type"] == "tool_call":
                print(f"🔧 工具: {event['tool']}({event['args']})")
            elif event["type"] == "permission_request":
                # 人工介入确认
                print(f"⚠️ 需要权限: {event['description']}")
                choice = input("允许? (y/n): ")
                await ws.send(json.dumps({
                    "type": "permission_response",
                    "id": event["id"],
                    "approved": choice.lower() == "y"
                }))
            elif event["type"] == "complete":
                print(f"✅ 完成: {event['summary']}")
                break
 
# asyncio.run(realtime_agent("./project", "实现登录功能"))

权限控制

声明式权限

from claude_agent_sdk import PermissionMode
 
agent = Agent(
    permission_mode=PermissionMode.ASK,  # 每次询问
    
    # 或细粒度控制
    permissions={
        "file:write": "ask",      # 写文件要确认
        "file:read": "allow",     # 读文件自动
        "bash:run": {
            "pytest *": "allow",  # 测试自动
            "rm *": "deny",       # 删除禁止
            "*": "ask"            # 其他询问
        }
    }
)

Hooks 拦截

# 在关键节点插入自定义逻辑
@agent.hook("before_tool_call")
def log_tool_call(tool_name, args):
    logger.info(f"AI 调用工具: {tool_name}")
    
@agent.hook("after_file_write")
def notify_team(file_path):
    slack.send(f"AI 修改了: {file_path}")

生产部署

Docker 部署

FROM python:3.11-slim
 
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
 
COPY . .
ENV ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
 
CMD ["python", "agent_service.py"]

Kubernetes

apiVersion: apps/v1
kind: Deployment
metadata:
  name: claude-agent
spec:
  replicas: 3
  template:
    spec:
      containers:
      - name: agent
        image: my-registry/claude-agent:v1
        env:
        - name: ANTHROPIC_API_KEY
          valueFrom:
            secretKeyRef:
              name: anthropic-api
              key: token
        resources:
          limits:
            memory: "2Gi"
            cpu: "1000m"

可观测性

from claude_agent_sdk import OpenTelemetryConfig
 
agent = Agent(
    telemetry=OpenTelemetryConfig(
        exporter="otlp",
        endpoint="http://jaeger:4317",
        service_name="code-review-agent"
    )
)

参考:Observability


完整示例:自动化代码审查

#!/usr/bin/env python3
"""自动化代码审查 Agent"""
 
import asyncio
from claude_agent_sdk import Agent, AgentLoop
from claude_agent_sdk.tools import FileRead, Bash, Git
 
class CodeReviewAgent:
    def __init__(self, project_dir: str):
        self.agent = Agent(
            model="claude-sonnet-4-6",
            project_dir=project_dir,
            tools=[
                FileRead(),
                Bash(allowed=["pytest", "black", "flake8", "git"]),
                Git()
            ],
            permissions={
                "file:write": "deny",   # 只读审查
                "file:read": "allow",
                "bash:run": {
                    "pytest *": "allow",
                    "black --check *": "allow",
                    "flake8 *": "allow",
                }
            }
        )
    
    async def review_pr(self, branch: str) -> dict:
        """审查 PR 并返回报告"""
        
        # 获取变更文件
        diff = await self.agent.run(
            f"获取 {branch} 分支的变更文件列表"
        )
        
        # 并行审查
        tasks = []
        for file in diff.changed_files:
            task = self.agent.run(
                f"审查文件 {file},关注:\n"
                "1. 安全性(SQL注入、XSS等)\n"
                "2. 性能问题\n"
                "3. 代码风格\n"
                "4. 测试覆盖"
            )
            tasks.append(task)
        
        results = await asyncio.gather(*tasks)
        
        # 汇总报告
        report = {
            "summary": self._summarize(results),
            "issues": [r.issues for r in results],
            "suggestions": [r.suggestions for r in results]
        }
        
        return report
    
    def _summarize(self, results):
        total_issues = sum(len(r.issues) for r in results)
        return f"发现 {total_issues} 个问题,{len(results)} 个文件已审查"
 
# 使用
async def main():
    reviewer = CodeReviewAgent("./my-project")
    report = await reviewer.review_pr("feature/new-auth")
    print(report)
 
if __name__ == "__main__":
    asyncio.run(main())

企业级实战场景

场景一:内部代码审查平台(FastAPI 微服务)

某 500 人研发团队需要统一代码审查标准,将 Agent SDK 封装为内部服务,供 GitHub/GitLab Webhook 调用。

# review_service.py - 生产级封装
import asyncio
import hashlib
import hmac
import os
from datetime import datetime
from typing import Optional
 
from fastapi import FastAPI, Header, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from claude_agent_sdk import Agent, PermissionMode
 
app = FastAPI(title="Claude Code Review Service", version="1.0.0")
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://github.company.com", "https://gitlab.company.com"],
    allow_methods=["POST"],
)
 
# 配置
GITHUB_SECRET = os.environ["GITHUB_WEBHOOK_SECRET"]
REVIEW_CONFIG = {
    "model": "claude-sonnet-4-6",
    "max_files_per_review": 20,
    "max_lines_per_file": 500,
    "focus_areas": ["security", "performance", "maintainability"]
}
 
class ReviewJob(BaseModel):
    repo: str
    pr_number: int
    base_sha: str
    head_sha: str
    changed_files: list[str]
    author: str
    team: Optional[str] = None
 
class ReviewService:
    def __init__(self):
        self.agent = Agent(
            model=REVIEW_CONFIG["model"],
            permission_mode=PermissionMode.ASK,
            permissions={
                "file:write": "deny",   # 只读审查,禁止修改
                "file:read": "allow",
                "bash:run": {
                    "git *": "allow",
                    "pytest *": "allow",
                    "*": "deny"
                }
            }
        )
        self.active_jobs = {}
    
    async def review_pr(self, job: ReviewJob) -> dict:
        """执行 PR 审查"""
        job_id = hashlib.sha256(
            f"{job.repo}:{job.pr_number}:{datetime.utcnow().isoformat()}".encode()
        ).hexdigest()[:12]
        
        self.active_jobs[job_id] = {"status": "running", "progress": 0}
        
        try:
            # 1. 获取 diff
            diff_result = await self.agent.run(
                f"在 {job.repo} 中比较 {job.base_sha}..{job.head_sha} 的变更,"
                f"列出前 {REVIEW_CONFIG['max_files_per_review']} 个变更文件"
            )
            
            # 2. 按文件并行审查
            review_tasks = []
            for file in diff_result.changed_files[:REVIEW_CONFIG["max_files_per_review"]]:
                task = self._review_single_file(file, job)
                review_tasks.append(task)
            
            file_reviews = await asyncio.gather(*review_tasks, return_exceptions=True)
            
            # 3. 汇总报告
            summary = await self.agent.run(
                "汇总以下各文件审查结果,生成统一 PR 审查报告:\n"
                + "\n".join(str(r) for r in file_reviews if not isinstance(r, Exception))
            )
            
            report = {
                "job_id": job_id,
                "repo": job.repo,
                "pr_number": job.pr_number,
                "summary": summary.content,
                "file_reviews": [
                    r for r in file_reviews if not isinstance(r, Exception)
                ],
                "completed_at": datetime.utcnow().isoformat()
            }
            
            self.active_jobs[job_id] = {"status": "completed", "report": report}
            return report
            
        except Exception as e:
            self.active_jobs[job_id] = {"status": "failed", "error": str(e)}
            raise
    
    async def _review_single_file(self, file_path: str, job: ReviewJob) -> dict:
        """审查单个文件"""
        focus = ", ".join(REVIEW_CONFIG["focus_areas"])
        result = await self.agent.run(
            f"审查文件 {file_path},关注: {focus}\n"
            "对每个发现的问题,给出:严重程度、行号、修复建议。"
        )
        return {
            "file": file_path,
            "issues": result.issues or [],
            "suggestions": result.suggestions or []
        }
 
service = ReviewService()
 
@app.post("/webhook/github")
async def github_webhook(
    request: Request,
    x_hub_signature_256: str = Header(None)
):
    """接收 GitHub PR Webhook"""
    body = await request.body()
    
    # 验证签名
    expected = "sha256=" + hmac.new(
        GITHUB_SECRET.encode(), body, hashlib.sha256
    ).hexdigest()
    if not hmac.compare_digest(expected, x_hub_signature_256 or ""):
        raise HTTPException(401, "Invalid signature")
    
    payload = await request.json()
    if payload.get("action") not in ["opened", "synchronize"]:
        return {"ignored": True}
    
    pr = payload["pull_request"]
    job = ReviewJob(
        repo=payload["repository"]["full_name"],
        pr_number=pr["number"],
        base_sha=pr["base"]["sha"],
        head_sha=pr["head"]["sha"],
        changed_files=[f["filename"] for f in pr.get("files", [])],
        author=pr["user"]["login"]
    )
    
    # 后台执行审查
    asyncio.create_task(service.review_pr(job))
    return {"accepted": True, "message": "Review job queued"}
 
@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str):
    """查询审查任务状态"""
    return service.active_jobs.get(job_id, {"status": "not_found"})
 
# 部署: uvicorn review_service:app --host 0.0.0.0 --port 8080

场景二:批量代码迁移工具(Python 2→3 跨仓库)

企业维护 200+ 遗留 Python 2 仓库,需要批量、可控地完成迁移。

# batch_migration.py
import asyncio
import json
import logging
import os
from dataclasses import dataclass
from pathlib import Path
 
from claude_agent_sdk import Agent, AgentLoop, PermissionMode
 
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("migration")
 
@dataclass
class RepoConfig:
    name: str
    clone_url: str
    default_branch: str
    criticality: str  # high / medium / low
    test_command: str = "pytest tests/ -x -q"
 
MIGRATION_REPOS = [
    RepoConfig("billing-service", "[email protected]:company/billing.git", "main", "high", "pytest"),
    RepoConfig("user-api", "[email protected]:company/user-api.git", "master", "high", "pytest"),
    RepoConfig("report-generator", "[email protected]:company/reports.git", "main", "medium", "python -m unittest discover"),
]
 
class BatchMigration:
    def __init__(self, work_dir: str = "/tmp/migration"):
        self.work_dir = Path(work_dir)
        self.work_dir.mkdir(parents=True, exist_ok=True)
        self.results = []
        
        # 为迁移任务配置专用 Agent(允许写文件、运行 git)
        self.agent = Agent(
            model="claude-opus",
            permission_mode=PermissionMode.ASK,
            permissions={
                "file:write": "allow",   # 迁移需要修改代码
                "file:read": "allow",
                "bash:run": {
                    "git *": "allow",
                    "2to3 *": "allow",
                    "pytest *": "allow",
                    "python *": "allow",
                    "pip *": "allow",
                    "rm -rf /": "deny",
                    "*": "ask"
                }
            }
        )
    
    async def migrate_repo(self, repo: RepoConfig) -> dict:
        """迁移单个仓库"""
        repo_dir = self.work_dir / repo.name
        result = {"repo": repo.name, "status": "pending", "phases": []}
        
        try:
            # Phase 1: 克隆仓库
            logger.info(f"[{repo.name}] Phase 1: Clone")
            await self._run_bash(f"git clone {repo.clone_url} {repo_dir}")
            result["phases"].append({"phase": "clone", "status": "ok"})
            
            # Phase 2: 分析(只读)
            logger.info(f"[{repo.name}] Phase 2: Analysis")
            analysis = await self.agent.run(
                f"在 {repo_dir} 中扫描 Python 2 兼容性问题,"
                "输出 JSON 格式的详细问题清单(含文件、行号、建议修复)",
                project_dir=str(repo_dir)
            )
            result["phases"].append({"phase": "analysis", "status": "ok", "details": analysis.summary})
            
            # Phase 3: 自动修复
            logger.info(f"[{repo.name}] Phase 3: Auto-fix")
            branch_name = f"auto/migrate-py3-{os.urandom(4).hex()}"
            await self._run_bash(
                f"cd {repo_dir} && git checkout -b {branch_name}",
                project_dir=str(repo_dir)
            )
            
            fix_result = await self.agent.run(
                f"在 {repo_dir} 中自动修复所有 Python 2→3 兼容性问题,"
                "包括:print 语句、xrange、unicode 处理、除法运算、异常语法、导入调整。"
                "修复后运行 2to3 确认无遗漏。",
                project_dir=str(repo_dir)
            )
            result["phases"].append({"phase": "auto-fix", "status": "ok", "files_modified": fix_result.files_modified})
            
            # Phase 4: 测试验证
            logger.info(f"[{repo.name}] Phase 4: Verification")
            test_result = await self._run_bash(
                f"cd {repo_dir} && {repo.test_command}",
                project_dir=str(repo_dir)
            )
            tests_passed = test_result.returncode == 0
            result["phases"].append({"phase": "test", "status": "pass" if tests_passed else "fail"})
            
            # Phase 5: 提交并推送
            if tests_passed:
                await self._run_bash(
                    f"cd {repo_dir} && git add -A && "
                    f'git commit -m "chore: auto-migrate Python 2 to 3" && '
                    f"git push origin {branch_name}",
                    project_dir=str(repo_dir)
                )
                result["status"] = "success"
                result["branch"] = branch_name
            else:
                result["status"] = "needs_manual_review"
            
        except Exception as e:
            logger.exception(f"[{repo.name}] Migration failed")
            result["status"] = "error"
            result["error"] = str(e)
        
        self.results.append(result)
        return result
    
    async def _run_bash(self, command: str, project_dir: str = None):
        """辅助:运行 bash 命令"""
        # 实际使用 Agent 的 bash 工具或 subprocess
        import subprocess
        return subprocess.run(command, shell=True, capture_output=True, text=True, cwd=project_dir)
    
    async def run_all(self, max_parallel: int = 3):
        """批量执行迁移,控制并发数"""
        semaphore = asyncio.Semaphore(max_parallel)
        
        async def limited_migrate(repo):
            async with semaphore:
                return await self.migrate_repo(repo)
        
        tasks = [limited_migrate(repo) for repo in MIGRATION_REPOS]
        await asyncio.gather(*tasks)
        
        # 生成汇总报告
        report_path = self.work_dir / "migration-report.json"
        report_path.write_text(json.dumps(self.results, indent=2, ensure_ascii=False))
        logger.info(f"Report saved to {report_path}")
        
        success = sum(1 for r in self.results if r["status"] == "success")
        logger.info(f"Migration complete: {success}/{len(MIGRATION_REPOS)} succeeded")
 
# 执行
if __name__ == "__main__":
    migrator = BatchMigration()
    asyncio.run(migrator.run_all(max_parallel=2))

下一步

02. Hooks 系统:自动化工作流

分享

评论

登录 后参与讨论。

加载中…

相关文章