目标:使用 Hooks 在关键执行点插入自定义逻辑,实现自动化工作流
预计时间:40 分钟
对应官方文档:Hooks Guide、Hooks Reference
什么是 Hooks?
Hooks 是在 Claude Code 执行过程中的拦截点,让你可以:
- 在文件修改前自动格式化
- 在任务完成后发送通知
- 在运行命令前检查安全性
- 自定义 AI 的行为逻辑
用户输入 → [hook: before_think] → AI 思考 → [hook: before_tool]
→ 执行工具 → [hook: after_tool] → ... → [hook: before_exit]
```mermaid
sequenceDiagram
participant U as 用户
participant H1 as before_think
participant AI as Claude AI
participant H2 as before_tool
participant T as 工具执行
participant H3 as after_tool
participant H4 as before_exit
U->>H1: 输入任务
H1->>AI: 修改提示词 / 添加上下文
AI->>H2: 决定调用工具
H2->>T: 权限检查 / 日志记录
T->>H3: 返回执行结果
H3->>AI: 处理结果 / 副作用
loop 直到任务完成
AI->>H2: 再次调用工具
H2->>T: 执行
T->>H3: 结果
end
AI->>H4: 任务完成
H4->>U: 发送通知 / 清理资源
---
## Hook 类型
### 生命周期 Hooks
| Hook | 触发时机 | 用途 |
|------|----------|------|
| `session_start` | 会话开始时 | 初始化环境、加载配置 |
| `before_think` | AI 思考前 | 修改提示词、添加上下文 |
| `before_tool` | 调用工具前 | 权限检查、日志记录 |
| `after_tool` | 工具执行后 | 结果处理、副作用 |
| `before_file_write` | 写入文件前 | 格式化、备份 |
| `after_file_write` | 写入文件后 | 通知、验证 |
| `before_command` | 运行命令前 | 安全检查 |
| `after_command` | 命令执行后 | 结果分析 |
| `session_end` | 会话结束时 | 清理、报告 |
---
## 配置 Hooks
### 文件位置
项目/ ├── .claude/ │ └── hooks/ │ ├── pre-write.sh # 文件写入前 │ ├── post-write.sh # 文件写入后 │ ├── pre-command.sh # 命令执行前 │ └── session-start.sh # 会话开始
### 企业级 Hook 管理器(Python)
当团队有数十个 Hook 需要统一编排、监控和回滚时,纯 Shell 脚本难以维护。以下是一个可扩展的 Python Hook 管理框架:
```python
#!/usr/bin/env python3
# .claude/hooks/manager.py
"""企业级 Hook 管理器:支持链式执行、超时控制、审计日志、优雅降级"""
import json
import logging
import subprocess
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, List, Optional
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
handlers=[
logging.FileHandler("/var/log/claude-hooks.log"),
logging.StreamHandler(sys.stderr)
]
)
logger = logging.getLogger("hook-manager")
@dataclass
class HookContext:
"""Hook 执行上下文"""
hook_name: str
event_type: str # before_file_write / after_command / ...
payload: dict # 事件数据(文件路径、命令内容等)
project_dir: Path
session_id: Optional[str] = None
start_time: float = field(default_factory=time.time)
metadata: dict = field(default_factory=dict)
@dataclass
class HookResult:
"""Hook 执行结果"""
success: bool
message: str
duration_ms: float
action: str = "continue" # continue / block / modify / abort
modified_payload: Optional[dict] = None
class HookRegistry:
"""Hook 注册中心"""
def __init__(self):
self._hooks: dict[str, List[Callable]] = {}
def register(self, event_type: str, handler: Callable):
self._hooks.setdefault(event_type, []).append(handler)
logger.info(f"Registered handler for {event_type}")
def execute(self, event_type: str, context: HookContext, timeout_sec: float = 30.0) -> HookResult:
handlers = self._hooks.get(event_type, [])
if not handlers:
return HookResult(True, "No handlers", 0.0)
for handler in handlers:
start = time.time()
try:
result = handler(context)
result.duration_ms = (time.time() - start) * 1000
if result.action == "block":
logger.warning(f"Hook blocked: {handler.__name__}")
return result
elif result.action == "modify" and result.modified_payload:
context.payload = result.modified_payload
except Exception as e:
logger.error(f"Hook {handler.__name__} failed: {e}")
# 企业策略:关键 Hook 失败时阻断,非关键 Hook 允许降级
if context.metadata.get("critical", False):
return HookResult(False, str(e), 0.0, action="block")
return HookResult(True, "All handlers passed", 0.0)
# ============ 具体 Hook 实现 ============
def license_header_check(context: HookContext) -> HookResult:
"""检查新文件是否包含公司许可证头"""
file_path = Path(context.payload.get("file_path", ""))
if file_path.suffix not in (".py", ".ts", ".js", ".go", ".java"):
return HookResult(True, "Skip non-source file", 0.0)
content = context.payload.get("content", "")
company_header = "Copyright (c) 2025 Company Inc."
if company_header not in content:
return HookResult(
False,
f"Missing license header in {file_path}",
0.0,
action="block"
)
return HookResult(True, "License header OK", 0.0)
def auto_backup_before_write(context: HookContext) -> HookResult:
"""写入前自动备份原文件"""
file_path = Path(context.payload.get("file_path", ""))
if file_path.exists():
backup_dir = Path("/tmp/claude-backups") / context.session_id
backup_dir.mkdir(parents=True, exist_ok=True)
backup_path = backup_dir / f"{file_path.name}.{int(time.time())}"
backup_path.write_bytes(file_path.read_bytes())
logger.info(f"Backup created: {backup_path}")
return HookResult(True, "Backup completed", 0.0)
def notify_slack_on_critical_change(context: HookContext) -> HookResult:
"""关键文件修改时通知 Slack"""
file_path = context.payload.get("file_path", "")
critical_patterns = ["auth", "payment", "security", "config.prod"]
if any(p in file_path.lower() for p in critical_patterns):
import urllib.request
webhook = context.metadata.get("slack_webhook")
if webhook:
payload = json.dumps({
"text": f"🚨 关键文件被修改: `{file_path}`\n"
f"项目: {context.project_dir}\n"
f"会话: {context.session_id}"
}).encode()
try:
urllib.request.urlopen(
urllib.request.Request(webhook, data=payload, headers={"Content-Type": "application/json"}),
timeout=5
)
except Exception as e:
logger.warning(f"Slack notify failed: {e}")
return HookResult(True, "Notification sent", 0.0)
def enforce_code_format(context: HookContext) -> HookResult:
"""强制格式化后写入"""
file_path = Path(context.payload.get("file_path", ""))
content = context.payload.get("content", "")
formatters = {
".py": ["black", "-"],
".js": ["prettier", "--parser", "babel", "--stdin-filepath", str(file_path)],
".ts": ["prettier", "--parser", "typescript", "--stdin-filepath", str(file_path)],
".go": ["gofmt"],
}
ext = file_path.suffix
if ext in formatters:
try:
proc = subprocess.run(
formatters[ext],
input=content,
capture_output=True,
text=True,
timeout=10
)
if proc.returncode == 0:
return HookResult(
True,
f"Formatted with {formatters[ext][0]}",
0.0,
action="modify",
modified_payload={**context.payload, "content": proc.stdout}
)
except Exception as e:
logger.warning(f"Format failed: {e}")
return HookResult(True, "No formatter applied", 0.0)
# ============ 初始化并注册 ============
registry = HookRegistry()
registry.register("before_file_write", license_header_check)
registry.register("before_file_write", auto_backup_before_write)
registry.register("before_file_write", enforce_code_format)
registry.register("after_file_write", notify_slack_on_critical_change)
# 实际执行入口(由 Claude Code 调用)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--event", required=True)
parser.add_argument("--payload-json", required=True)
parser.add_argument("--project-dir", default=".")
parser.add_argument("--session-id", default=None)
args = parser.parse_args()
context = HookContext(
hook_name="enterprise-manager",
event_type=args.event,
payload=json.loads(args.payload_json),
project_dir=Path(args.project_dir),
session_id=args.session_id,
metadata={"slack_webhook": os.environ.get("SLACK_WEBHOOK_URL")}
)
result = registry.execute(args.event, context)
print(json.dumps({
"success": result.success,
"message": result.message,
"action": result.action,
"modified_payload": result.modified_payload
}))
sys.exit(0 if result.success and result.action != "block" else 1)
GitHub PR 自动标签 Hook(Bash + GitHub CLI)
#!/bin/bash
# .claude/hooks/post-write-auto-label.sh
# 当 Claude 修改特定类型文件时,自动给当前 PR 打标签
FILE="$1"
PR_NUMBER=$(gh pr view --json number -q '.number' 2>/dev/null || echo "")
if [ -z "$PR_NUMBER" ]; then
exit 0
fi
# 根据文件类型打标签
if "$FILE" == *"test*" || "$FILE" == *"spec*"; then
gh pr edit "$PR_NUMBER" --add-label "auto:tests"
fi
if "$FILE" == *"migration*" || "$FILE" == *"migrate*"; then
gh pr edit "$PR_NUMBER" --add-label "auto:migration"
fi
if "$FILE" == *"api*" || "$FILE" == *"openapi*"; then
gh pr edit "$PR_NUMBER" --add-label "auto:api-change"
fi
# 安全相关文件
if "$FILE" == *"auth*" || "$FILE" == *"security*" || "$FILE" == *"permission*"; then
gh pr edit "$PR_NUMBER" --add-label "security-review-required"
fi
echo "✅ PR #$PR_NUMBER 标签已更新"示例:自动格式化
#!/bin/bash
# .claude/hooks/pre-write.sh
# 在 Claude 写入文件前自动格式化
FILE="$1" # Claude 传入的文件路径
if "$FILE" == *.py; then
black "$FILE"
elif "$FILE" == *.js || "$FILE" == *.ts; then
prettier --write "$FILE"
elif "$FILE" == *.go; then
gofmt -w "$FILE"
fi示例:安全审查
#!/bin/bash
# .claude/hooks/pre-command.sh
# 拦截危险命令
COMMAND="$1"
# 禁止删除命令
if "* || *"*; then
echo "ERROR: 危险命令被拦截: $COMMAND"
exit 1
fi
# 禁止直接操作生产数据库
if "$COMMAND" == *"psql production"* || "$COMMAND" == *"mysql -h prod"*; then
echo "ERROR: 生产数据库操作需要手动确认"
exit 1
fi
echo "OK: 命令通过安全检查"HTTP Hooks
发送通知到 Slack
# .claude/hooks/slack-notify.yaml
name: slack-notify
trigger: after_file_write
method: POST
url: https://hooks.slack.com/services/T00/B00/XXX
headers:
Content-Type: application/json
body: |
{
"text": "Claude 修改了文件: {{file_path}}",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*文件修改通知*\n项目: {{project_name}}\n文件: `{{file_path}}`\n操作者: {{user}}"
}
}
]
}调用内部 API
# .claude/hooks/audit-log.yaml
name: audit-log
trigger: before_command
method: POST
url: https://internal.company.com/api/audit
headers:
Authorization: Bearer {{env.AUDIT_API_TOKEN}}
body: |
{
"action": "ai_command",
"command": "{{command}}",
"session_id": "{{session_id}}",
"timestamp": "{{timestamp}}"
}Prompt Hooks
动态修改 AI 提示词
# .claude/hooks/custom_prompt.py
def before_think(context):
"""在 AI 思考前修改提示词"""
# 根据时间添加提醒
import datetime
hour = datetime.datetime.now().hour
if hour >= 18:
context.system_prompt += "\n注意:现在是下班时间,优先给出简洁的方案。"
# 根据文件类型添加规范
if context.current_file.endswith('.py'):
context.system_prompt += "\nPython 代码必须使用类型注解。"
return contextMCP Tool Hooks
拦截 MCP 工具调用
# .claude/hooks/mcp-intercept.yaml
name: mcp-audit
trigger: before_mcp_tool
condition: "tool_name == 'postgres'"
action: ask # 数据库操作前询问确认完整案例:代码质量门禁
.claude/hooks/
├── 01-check-style.sh # 代码风格检查
├── 02-run-tests.sh # 测试执行
├── 03-security-scan.sh # 安全扫描
└── 04-notify-team.sh # 团队通知
#!/bin/bash
# 01-check-style.sh
echo "🔍 运行代码风格检查..."
if ! black --check src/; then
echo "❌ 代码格式不符合 Black 规范"
echo "建议运行: black src/"
exit 1
fi
if ! flake8 src/; then
echo "❌ 存在代码风格问题"
exit 1
fi
echo "✅ 代码风格检查通过"#!/bin/bash
# 03-security-scan.sh
echo "🔒 运行安全扫描..."
# 检查是否引入了危险函数
if grep -r "eval(" src/ || grep -r "exec(" src/; then
echo "⚠️ 发现危险函数 eval/exec,请确认安全性"
exit 1
fi
# 检查密钥泄露
if grep -r "sk-" src/ || grep -r "password=" src/; then
echo "⚠️ 可能包含敏感信息,请检查"
exit 1
fi
echo "✅ 安全扫描通过"Hooks 最佳实践
| ✅ 推荐 | ❌ 避免 |
|---|---|
| 保持 Hooks 轻量快速 | 在 Hook 中执行长时间任务 |
| 明确错误处理 | 静默失败 |
| 使用日志记录 | 无追踪的黑盒操作 |
| 幂等设计 | 重复执行产生副作用 |
| 版本控制 Hooks | 本地随意修改 |
调试 Hooks
# 查看 Hook 执行日志
claude hooks logs
# 测试单个 Hook
claude hooks test pre-write.sh --file src/test.py
# 禁用所有 Hooks
claude hooks disable
# 启用 Hooks
claude hooks enable企业级实战场景
场景一:金融级代码合规门禁(SOC 2 / 等保)
金融企业要求所有 AI 辅助生成的代码必须通过多道合规检查才能提交。以下是一整套生产级 Hooks 配置:
# .claude/hooks/compliance-gate.yaml
# 企业策略:所有 Claude 修改必须经过合规门禁
name: financial-compliance-gate
version: 1.0.0
description: 金融级代码合规检查门禁系统
hooks:
- id: pre-write-license
trigger: before_file_write
script: scripts/check_license.py
fail_action: block
timeout: 5s
description: 确保所有源文件包含公司许可证头
- id: pre-write-secrets
trigger: before_file_write
script: scripts/scan_secrets.py
fail_action: block
timeout: 10s
description: 阻止任何密钥、密码、Token 被写入代码
- id: pre-command-sandbox
trigger: before_command
script: scripts/sandbox_check.sh
fail_action: block
timeout: 3s
description: 禁止执行任何可能接触生产环境的命令
- id: post-write-sonarqube
trigger: after_file_write
script: scripts/trigger_sonar.sh
fail_action: warn
timeout: 60s
description: 触发 SonarQube 质量门禁扫描
- id: post-write-audit
trigger: after_file_write
http:
method: POST
url: https://audit.company.com/api/v1/claude-events
headers:
Authorization: Bearer {{env.AUDIT_API_TOKEN}}
X-Audit-Source: claude-code
body: |
{
"event_type": "file_write",
"file_path": "{{file_path}}",
"session_id": "{{session_id}}",
"user": "{{env.USER}}",
"timestamp": "{{timestamp}}",
"diff_summary": "{{diff_summary}}"
}
description: 写入审计日志(不可失败,失败则告警)
- id: session-end-report
trigger: session_end
http:
method: POST
url: https://compliance.company.com/api/v1/session-reports
headers:
Authorization: Bearer {{env.COMPLIANCE_TOKEN}}
body: |
{
"session_id": "{{session_id}}",
"duration_sec": {{duration_sec}},
"files_modified": {{files_modified}},
"commands_executed": {{commands_executed}},
"model": "{{model}}",
"cost_usd": {{cost_usd}}
}
description: 会话结束时上报完整使用报告# scripts/scan_secrets.py - 生产级密钥扫描
#!/usr/bin/env python3
import re
import sys
SECRETS_PATTERNS = [
(r'sk-[a-zA-Z0-9]{48}', 'OpenAI/Anthropic API Key'),
(r'AKIA[0-9A-Z]{16}', 'AWS Access Key ID'),
(r'ghp_[a-zA-Z0-9]{36}', 'GitHub Personal Access Token'),
(r'password\s*=\s*["\'][^"\']+["\']', 'Hardcoded Password'),
(r'secret\s*=\s*["\'][^"\']{8,}["\']', 'Hardcoded Secret'),
(r'-----BEGIN (RSA |EC |DSA |OPENSSH )?PRIVATE KEY-----', 'Private Key'),
]
WHITELIST_FILES = ['.env.example', 'docker-compose.yml', 'README.md']
def scan_content(content: str, filename: str) -> list:
violations = []
lines = content.split('\n')
for line_num, line in enumerate(lines, 1):
for pattern, name in SECRETS_PATTERNS:
if re.search(pattern, line, re.IGNORECASE):
# 检查是否在白名单注释中
if '# nosec' in line or '# noqa' in line:
continue
violations.append({
'line': line_num,
'type': name,
'snippet': line.strip()[:80]
})
return violations
if __name__ == '__main__':
file_path = sys.argv[1] if len(sys.argv) > 1 else ''
content = sys.stdin.read()
if any(wl in file_path for wl in WHITELIST_FILES):
print(f"SKIP: {file_path} is whitelisted")
sys.exit(0)
violations = scan_content(content, file_path)
if violations:
print(f"BLOCKED: Found {len(violations)} secrets in {file_path}")
for v in violations:
print(f" Line {v['line']}: {v['type']} | {v['snippet']}")
sys.exit(1)
print(f"PASS: No secrets found in {file_path}")
sys.exit(0)# scripts/sandbox_check.sh - 生产环境隔离检查
#!/bin/bash
COMMAND="$1"
# 严格禁止的模式
DENY_PATTERNS=(
"rm -rf /"
"rm -rf /*"
":(){ :|:& };:"
"> /dev/sda"
"mkfs."
"dd if=/dev/zero"
"psql.*production"
"mysql -h.*prod"
"redis-cli -h.*prod"
"kubectl.*delete.*namespace"
"terraform destroy"
)
for pattern in "${DENY_PATTERNS[@]}"; do
if "$COMMAND" =~ $pattern; then
echo "BLOCKED: Dangerous command pattern detected: '$COMMAND'"
echo "Matches pattern: $pattern"
exit 1
fi
done
# 需要二次确认的模式
CONFIRM_PATTERNS=(
"pip install"
"npm install"
"docker run"
"kubectl apply"
"git push"
"git reset --hard"
)
for pattern in "${CONFIRM_PATTERNS[@]}"; do
if "$COMMAND" == *"$pattern"*; then
echo "REQUIRE_CONFIRM: '$COMMAND' requires manual approval"
exit 2 # 特殊退出码表示需要确认
fi
done
echo "PASS: Command '$COMMAND' is allowed"
exit 0场景二:自动文档同步流水线(API → 知识库)
技术团队使用 Claude 开发 API 时,要求每次修改接口代码后自动同步到 Confluence/Notion/内部知识库。
# .claude/hooks/doc-sync-pipeline.yaml
name: doc-sync-pipeline
description: API 代码变更自动同步到知识库
hooks:
- id: detect-api-change
trigger: after_file_write
condition: "file_path matches '.*(controller|router|handler|api).*\\.(py|ts|go|java)$'"
script: scripts/extract_openapi.py
description: 检测 API 文件变更并提取 OpenAPI 定义
- id: generate-markdown-doc
trigger: after_file_write
depends_on: [detect-api-change]
script: scripts/gen_api_doc.py
description: 根据代码变更生成 Markdown 文档片段
- id: sync-to-confluence
trigger: after_file_write
depends_on: [generate-markdown-doc]
http:
method: PUT
url: https://company.atlassian.net/wiki/rest/api/content/{{page_id}}
headers:
Authorization: Basic {{env.CONFLUENCE_AUTH}}
Content-Type: application/json
body: |
{
"version": {"number": {{new_version}}},
"title": "{{page_title}}",
"type": "page",
"body": {
"storage": {
"value": "{{generated_html}}",
"representation": "storage"
}
}
}
description: 将生成的文档同步到 Confluence
- id: notify-team
trigger: after_file_write
depends_on: [sync-to-confluence]
http:
method: POST
url: {{env.SLACK_WEBHOOK_URL}}
body: |
{
"text": "📚 API 文档已自动更新",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*{{file_name}}* 的变更已同步到 Confluence\n"
"<{{confluence_url}}|查看更新后的文档>"
}
}
]
}
description: 通知团队文档已更新# scripts/extract_openapi.py - 从代码中提取 OpenAPI 定义
#!/usr/bin/env python3
"""自动从 FastAPI/Flask/Express 代码中提取 API 变更"""
import ast
import json
import sys
from pathlib import Path
def extract_fastapi_routes(file_path: str) -> list:
"""从 FastAPI 文件中提取路由定义"""
content = Path(file_path).read_text()
tree = ast.parse(content)
routes = []
for node in ast.walk(tree):
if isinstance(node, ast.Call):
# 检测 @app.get / @app.post 等装饰器调用
func_name = ""
if isinstance(node.func, ast.Attribute):
func_name = node.func.attr
if func_name in ('get', 'post', 'put', 'delete', 'patch'):
route_info = {'method': func_name.upper(), 'args': []}
for kw in node.keywords:
if kw.arg == 'path' or kw.arg == 'response_model':
route_info[kw.arg] = ast.literal_eval(kw.value) if isinstance(kw.value, ast.Constant) else str(kw.value)
routes.append(route_info)
return routes
def generate_changelog(old_routes: list, new_routes: list) -> str:
"""生成 API 变更日志"""
old_paths = {r.get('path', '') for r in old_routes}
new_paths = {r.get('path', '') for r in new_routes}
added = new_paths - old_paths
removed = old_paths - new_paths
lines = ["## API 变更"]
if added:
lines.append("### 新增接口")
for path in sorted(added):
lines.append(f"- `+ {path}`")
if removed:
lines.append("### 移除接口")
for path in sorted(removed):
lines.append(f"- `- {path}`")
if not added and not removed:
lines.append("- 接口路径无变更,可能有参数或返回值调整")
return "\n".join(lines)
if __name__ == '__main__':
file_path = sys.argv[1]
routes = extract_fastapi_routes(file_path)
print(json.dumps({
"file": file_path,
"routes": routes,
"route_count": len(routes)
}, indent=2))