目标:开发自定义插件,创建团队或社区的插件市场
预计时间:45 分钟
对应官方文档:Plugins、Plugins Reference、Plugin Marketplaces
插件系统概述
Claude Code 插件可以包含:
- Skills:自定义命令
- Hooks:自动化逻辑
- MCP Servers:外部工具连接
- Subagents:专用代理模板
创建插件
插件结构
my-plugin/
├── plugin.yaml # 插件配置
├── skills/
│ └── deploy.yaml # 自定义命令
├── hooks/
│ └── pre-write.sh # 自动化钩子
├── mcp/
│ └── server.py # MCP 服务
├── subagents/
│ └── security-expert.yaml
└── README.md
plugin.yaml
name: my-team-toolkit
version: 1.2.0
description: 团队内部开发工具集
author: "Your Team <[email protected]>"
license: MIT
# 依赖的其他插件
dependencies:
- name: github-integration
version: ">=2.0.0"
- name: slack-notifier
version: "~1.5.0"
# 安装时执行
install:
script: scripts/setup.sh
# 配置 Schema
config:
schema: config-schema.json
defaults: config-defaults.yaml实战:开发部署插件
1. 创建插件目录
mkdir -p deploy-plugin/{skills,hooks,mcp}
cd deploy-plugin2. 编写插件配置
# plugin.yaml
name: deploy-helper
version: 1.0.0
description: 一键部署到多环境
skills:
- skills/deploy.yaml
hooks:
- hooks/post-deploy.sh3. 编写 Skill
# skills/deploy.yaml
name: deploy
version: 1.0.0
deployments:
- name: staging
description: 部署到预发布环境
steps:
- name: test
run: pytest tests/ -x -q
- name: build
run: docker build -t app:staging .
- name: push
run: docker push registry/app:staging
- name: deploy
run: kubectl set image deployment/app app=registry/app:staging
- name: verify
run: kubectl rollout status deployment/app
- name: production
description: 部署到生产环境(蓝绿部署)
confirmation: true # 需要确认
steps:
- name: backup
run: ./scripts/backup-db.sh
- name: deploy-green
run: kubectl apply -f k8s/green/
- name: health-check
run: ./scripts/health-check.sh green
- name: switch-traffic
run: kubectl patch service app -p '{"spec":{"selector":{"version":"green"}}}'
- name: rollback-ready
run: echo "如需回滚,运行: deploy rollback"4. 编写 Hook
#!/bin/bash
# hooks/post-deploy.sh
ENV="$1"
VERSION="$2"
# 发送通知
curl -X POST "$SLACK_WEBHOOK" \
-H 'Content-Type: application/json' \
-d "{
\"text\": \"✅ 部署完成: $ENV @$VERSION\"
}"
# 记录审计日志
echo "$(date) | $ENV | $VERSION | $USER" >> /var/log/deployments.log5. 开发 MCP Server 插件
MCP (Model Context Protocol) 插件让 Claude 能够安全地访问外部系统和数据库。
# mcp/server.py - 企业内部数据库 MCP Server
import asyncio
import json
import os
from contextlib import asynccontextmanager
from typing import AsyncIterator
import asyncpg
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.types import (
Resource, Tool, TextContent,
ListResourcesRequest, ListToolsRequest,
ReadResourceRequest, CallToolRequest
)
# 企业级数据库连接池(只读账号)
DB_CONFIG = {
"host": os.environ.get("DB_HOST", "db.company.internal"),
"port": int(os.environ.get("DB_PORT", "5432")),
"database": os.environ.get("DB_NAME", "analytics"),
"user": os.environ.get("DB_READONLY_USER", "claude_readonly"),
"password": os.environ.get("DB_READONLY_PASSWORD"),
"ssl": "require",
# 安全限制
"command_timeout": 30,
"max_size": 5,
}
# 允许访问的表白名单(防止数据泄露)
ALLOWED_TABLES = {
"public.users", "public.orders", "public.products",
"analytics.daily_summary", "analytics.weekly_report"
}
server = Server("company-analytics-mcp")
@asynccontextmanager
async def app_lifespan(server: Server) -> AsyncIterator[dict]:
"""管理数据库连接池生命周期"""
pool = await asyncpg.create_pool(**DB_CONFIG)
yield {"pool": pool}
await pool.close()
server.set_app_lifespan(app_lifespan)
@server.list_resources()
async def list_resources() -> list[Resource]:
"""列出可访问的数据库资源"""
return [
Resource(
uri=f"analytics://{table}",
name=table.split(".")[1],
mimeType="application/json",
description=f"Read-only access to {table}"
)
for table in sorted(ALLOWED_TABLES)
]
@server.read_resource()
async def read_resource(uri: str) -> str:
"""读取资源(受控查询)"""
if not uri.startswith("analytics://"):
raise ValueError(f"Unknown resource: {uri}")
table = uri.replace("analytics://", "")
if table not in ALLOWED_TABLES:
raise PermissionError(f"Access denied to table: {table}")
pool = server.request_context.lifespan_context["pool"]
async with pool.acquire() as conn:
# 强制限制返回行数,防止大数据量拖垮系统
rows = await conn.fetch(f'SELECT * FROM {table} LIMIT 100')
return json.dumps([dict(r) for r in rows], indent=2, default=str)
@server.list_tools()
async def list_tools() -> list[Tool]:
"""注册安全的数据库查询工具"""
return [
Tool(
name="safe_query",
description="Execute a read-only SQL query against analytics database. "
"Only SELECT statements on allowed tables are permitted.",
inputSchema={
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "SQL SELECT query"
}
},
"required": ["sql"]
}
),
Tool(
name="get_table_schema",
description="Get the schema definition of an allowed table",
inputSchema={
"type": "object",
"properties": {
"table_name": {
"type": "string",
"enum": [t.split(".")[1] for t in ALLOWED_TABLES]
}
},
"required": ["table_name"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""执行工具调用(严格安全校验)"""
pool = server.request_context.lifespan_context["pool"]
if name == "safe_query":
sql = arguments.get("sql", "").strip()
# 多层安全校验
upper_sql = sql.upper()
forbidden_keywords = ["INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "CREATE", "GRANT", "EXEC", "TRUNCATE"]
if any(kw in upper_sql for kw in forbidden_keywords):
return [TextContent(type="text", text="ERROR: Only SELECT queries are allowed.")]
if not upper_sql.startswith("SELECT"):
return [TextContent(type="text", text="ERROR: Query must start with SELECT.")]
# 检查表名是否在白名单
for table in ALLOWED_TABLES:
short_name = table.split(".")[1]
if short_name in sql.lower() and table not in ALLOWED_TABLES:
return [TextContent(type="text", text=f"ERROR: Table {short_name} is not allowed.")]
try:
async with pool.acquire() as conn:
# 添加 LIMIT 保护
if "LIMIT" not in upper_sql:
sql += " LIMIT 100"
rows = await conn.fetch(sql)
result = json.dumps([dict(r) for r in rows], indent=2, default=str)
return [TextContent(type="text", text=result)]
except Exception as e:
return [TextContent(type="text", text=f"Query error: {str(e)}")]
elif name == "get_table_schema":
table_name = arguments.get("table_name")
full_name = f"public.{table_name}" if "." not in table_name else table_name
async with pool.acquire() as conn:
rows = await conn.fetch("""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = $1
ORDER BY ordinal_position
""", table_name)
schema = json.dumps([dict(r) for r in rows], indent=2)
return [TextContent(type="text", text=schema)]
return [TextContent(type="text", text=f"Unknown tool: {name}")]
async def main():
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream, write_stream,
InitializationOptions(
server_name="company-analytics-mcp",
server_version="1.0.0",
capabilities=server.get_capabilities()
)
)
if __name__ == "__main__":
asyncio.run(main())# mcp/mcp-config.yaml
name: company-analytics-mcp
version: 1.0.0
description: 企业内部数据分析 MCP Server(只读)
mcpServers:
analytics:
command: python
args: ["mcp/server.py"]
env:
DB_HOST: "db.company.internal"
DB_READONLY_USER: "claude_readonly"
DB_READONLY_PASSWORD: "${DB_READONLY_PASSWORD}" # 由 CI/CD 注入
# 企业级资源限制
resources:
max_memory: "256Mi"
max_cpu: "500m"
# 审计配置
audit:
log_all_queries: true
log_destination: "/var/log/mcp-analytics.log"
alert_on_sensitive_tables: ["users", "orders"]6. 打包插件
# 创建 zip 包
zip -r deploy-helper-v1.0.0.zip plugin.yaml skills/ hooks/
# 或创建 tar 包
tar czvf deploy-helper-v1.0.0.tar.gz plugin.yaml skills/ hooks/安装插件
本地安装
# 从文件
claude plugin install ./deploy-helper-v1.0.0.zip
# 从 URL
claude plugin install https://github.com/team/plugins/releases/download/v1.0/deploy-helper.zip在项目内使用
> /plugin enable deploy-helper
> /deploy staging
创建插件市场
市场架构
市场结构
plugin-marketplace/
├── index.json # 插件索引
├── plugins/
│ ├── deploy-helper/
│ │ ├── v1.0.0.zip
│ │ ├── v1.1.0.zip
│ │ └── manifest.json
│ └── code-review/
│ └── v2.0.0.zip
└── README.md
index.json
{
"name": "Company Internal Marketplace",
"version": "1.0.0",
"plugins": [
{
"name": "deploy-helper",
"description": "一键部署工具",
"latest": "1.1.0",
"versions": {
"1.1.0": {
"url": "plugins/deploy-helper/v1.1.0.zip",
"checksum": "sha256:abc123...",
"size": 15230
},
"1.0.0": {
"url": "plugins/deploy-helper/v1.0.0.zip",
"checksum": "sha256:def456..."
}
}
}
]
}配置市场源
# 添加私有市场
claude plugin source add company https://plugins.company.com/index.json
# 查看可用插件
claude plugin search --source company
# 安装
claude plugin install deploy-helper --source company企业插件商店后端(FastAPI)
# marketplace/server.py - 企业私有插件市场
import hashlib
import json
import os
from datetime import datetime
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, HTTPException, Header, UploadFile, File
from fastapi.responses import FileResponse
from pydantic import BaseModel, Field
app = FastAPI(title="Claude Plugin Marketplace", version="1.0.0")
STORAGE_DIR = Path(os.environ.get("PLUGIN_STORAGE", "/data/plugins"))
STORAGE_DIR.mkdir(parents=True, exist_ok=True)
INDEX_FILE = STORAGE_DIR / "index.json"
# 简单的 API Key 认证(生产环境应使用 OAuth2 / mTLS)
ADMIN_KEYS = set(os.environ.get("ADMIN_API_KEYS", "").split(","))
READ_KEYS = set(os.environ.get("READ_API_KEYS", "").split(","))
class PluginManifest(BaseModel):
name: str = Field(..., pattern=r"^[a-z0-9-]+$")
version: str
description: str
author: str
license: str = "MIT"
checksum: str
size: int
min_claude_version: str = "1.0.0"
dependencies: list[str] = []
tags: list[str] = []
class PluginIndex(BaseModel):
name: str
description: str
plugins: list[dict]
last_updated: str
def load_index() -> dict:
if INDEX_FILE.exists():
return json.loads(INDEX_FILE.read_text())
return {"name": "Company Marketplace", "plugins": [], "last_updated": datetime.utcnow().isoformat()}
def save_index(index: dict):
index["last_updated"] = datetime.utcnow().isoformat()
INDEX_FILE.write_text(json.dumps(index, indent=2))
def verify_auth(api_key: str, require_admin: bool = False) -> bool:
if require_admin and api_key in ADMIN_KEYS:
return True
if api_key in READ_KEYS or api_key in ADMIN_KEYS:
return True
return False
@app.get("/index.json")
async def get_index(x_api_key: str = Header(...)):
if not verify_auth(x_api_key):
raise HTTPException(401, "Invalid API key")
return load_index()
@app.get("/plugins/{plugin_name}/{version}.zip")
async def download_plugin(
plugin_name: str,
version: str,
x_api_key: str = Header(...)
):
if not verify_auth(x_api_key):
raise HTTPException(401, "Invalid API key")
file_path = STORAGE_DIR / plugin_name / f"{version}.zip"
if not file_path.exists():
raise HTTPException(404, "Plugin not found")
return FileResponse(file_path, media_type="application/zip")
@app.post("/admin/plugins")
async def upload_plugin(
manifest: str = Form(...),
file: UploadFile = File(...),
x_api_key: str = Header(...)
):
if not verify_auth(x_api_key, require_admin=True):
raise HTTPException(403, "Admin access required")
data = json.loads(manifest)
plugin = PluginManifest(**data)
# 保存文件
plugin_dir = STORAGE_DIR / plugin.name
plugin_dir.mkdir(exist_ok=True)
file_path = plugin_dir / f"{plugin.version}.zip"
content = await file.read()
# 校验 SHA256
actual_hash = hashlib.sha256(content).hexdigest()
if actual_hash != plugin.checksum:
raise HTTPException(400, f"Checksum mismatch: expected {plugin.checksum}, got {actual_hash}")
file_path.write_bytes(content)
# 更新索引
index = load_index()
existing = next((p for p in index["plugins"] if p["name"] == plugin.name), None)
version_info = {
"version": plugin.version,
"url": f"/plugins/{plugin.name}/{plugin.version}.zip",
"checksum": plugin.checksum,
"size": plugin.size,
"uploaded_at": datetime.utcnow().isoformat()
}
if existing:
existing["versions"][plugin.version] = version_info
existing["latest"] = max(existing["versions"].keys())
existing["description"] = plugin.description
existing["tags"] = plugin.tags
else:
index["plugins"].append({
"name": plugin.name,
"description": plugin.description,
"latest": plugin.version,
"tags": plugin.tags,
"versions": {plugin.version: version_info}
})
save_index(index)
return {"status": "uploaded", "plugin": plugin.name, "version": plugin.version}
@app.delete("/admin/plugins/{plugin_name}/{version}")
async def delete_plugin(
plugin_name: str,
version: str,
x_api_key: str = Header(...)
):
if not verify_auth(x_api_key, require_admin=True):
raise HTTPException(403, "Admin access required")
file_path = STORAGE_DIR / plugin_name / f"{version}.zip"
if file_path.exists():
file_path.unlink()
index = load_index()
plugin = next((p for p in index["plugins"] if p["name"] == plugin_name), None)
if plugin and version in plugin.get("versions", {}):
del plugin["versions"][version]
if plugin["versions"]:
plugin["latest"] = max(plugin["versions"].keys())
else:
index["plugins"] = [p for p in index["plugins"] if p["name"] != plugin_name]
save_index(index)
return {"status": "deleted"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8080)企业级插件管理
强制安装
管理员可以通过 server-managed settings 强制所有用户安装某些插件:
{
"requiredPlugins": [
{
"name": "security-guidance",
"version": ">=1.0.0",
"source": "company"
},
{
"name": "audit-logger",
"version": ">=2.0.0"
}
]
}版本约束
{
"pluginPolicy": {
"allowlist": ["company-*", "anthropic-*"],
"denylist": ["untrusted-*"],
"versionConstraints": {
"deploy-helper": ">=1.0.0 <2.0.0"
}
}
}企业级实战场景
场景一:多环境部署插件套件(Dev → Staging → Prod)
某电商公司需要一套标准化的多环境部署插件,确保所有团队遵循同样的发布流程。
# plugin.yaml - deploy-suite
name: deploy-suite
version: 2.1.0
description: 标准化多环境部署套件(含蓝绿、金丝雀、回滚)
author: "Platform Team <[email protected]>"
skills:
- skills/deploy.yaml
- skills/rollback.yaml
- skills/canary.yaml
hooks:
- hooks/pre-deploy-check.sh
- hooks/post-deploy-verify.sh
- hooks/notify-slack.sh
mcp:
- mcp/k8s-mcp.yaml
- mcp/datadog-mcp.yaml
subagents:
- subagents/rollback-expert.yaml
---# skills/canary.yaml - 金丝雀发布
name: canary-deploy
version: 1.0.0
description: 生产环境金丝雀发布
steps:
pre_check:
- name: health-check-staging
run: ./scripts/health-check.sh staging
- name: validate-metrics-baseline
mcp: datadog/query
args:
query: "avg:app.error_rate{env:staging}"
threshold: "< 0.01"
canary:
- name: deploy-canary-5pct
run: |
kubectl set image deployment/app-canary \
app=registry/app:${VERSION} \
--namespace production
kubectl scale deployment/app-canary --replicas=2 -n production
- name: route-traffic-5pct
run: |
kubectl patch virtualservice app \
--type merge \
-p '{"spec":{"http":[{"route":[{"destination":{"host":"app-canary"},"weight":5},{"destination":{"host":"app"},"weight":95}]}]}'
monitor:
- name: wait-5-minutes
sleep: 300
- name: check-canary-metrics
mcp: datadog/query
args:
queries:
- "avg:app.error_rate{env:production,version:canary}"
- "avg:app.latency_p95{env:production,version:canary}"
thresholds:
error_rate: "< 0.01"
latency_p95: "< 500ms"
promote_or_rollback:
- name: decision
condition: "canary_metrics.passed == true"
on_true:
- name: promote-100pct
run: |
kubectl set image deployment/app app=registry/app:${VERSION} -n production
kubectl scale deployment/app-canary --replicas=0 -n production
- name: notify-success
hook: notify-slack
args:
channel: "#deployments"
message: "✅ Canary promoted to 100%: ${VERSION}"
on_false:
- name: auto-rollback
skill: rollback
args:
env: production
reason: "Canary metrics failed"# hooks/pre-deploy-check.sh - 部署前强制检查
#!/bin/bash
set -euo pipefail
ENV="$1"
VERSION="$2"
echo "🔍 部署前检查: ${ENV} @ ${VERSION}"
# 1. 镜像存在性检查
if ! docker manifest inspect registry.company.com/app:${VERSION} > /dev/null 2>&1; then
echo "❌ 镜像 ${VERSION} 不存在"
exit 1
fi
# 2. 必填标签检查
LABELS=$(docker inspect registry.company.com/app:${VERSION} --format='{{json .Config.Labels}}')
if ! echo "$LABELS" | grep -q "ci.build.passed=true"; then
echo "❌ 镜像未通过 CI 构建检查"
exit 1
fi
# 3. 安全扫描检查
if ! echo "$LABELS" | grep -q "security.scan.critical=0"; then
echo "❌ 镜像存在关键安全漏洞"
exit 1
fi
# 4. 变更单(Change Request)关联
if "$ENV" == "production"; then
CR_ID=$(echo "$LABELS" | grep -oP 'change.request.id=\K[^"]+' || true)
if -z "$CR_ID"; then
echo "❌ 生产部署必须关联变更单"
exit 1
fi
echo "✅ 关联变更单: ${CR_ID}"
fi
echo "✅ 所有部署前检查通过"场景二:安全合规检查插件(SAST + DAST + 依赖扫描)
金融团队需要确保所有 Claude 辅助生成的代码在提交前通过完整的安全扫描。
# plugin.yaml - security-compliance
name: security-compliance
version: 3.0.0
description: 金融级安全合规检查插件
hooks:
- id: sast-scan
trigger: before_file_write
script: scripts/run-sast.py
priority: 100 # 最先执行
fail_action: block
- id: secret-scan
trigger: before_file_write
script: scripts/run-secret-scan.py
priority: 90
fail_action: block
- id: dependency-check
trigger: session_end
script: scripts/check-dependencies.py
priority: 80
fail_action: warn # 依赖问题警告但不阻断
- id: dast-trigger
trigger: after_file_write
condition: "file_path matches '.*api.*\\.(py|ts|go)$'"
script: scripts/trigger-dast.py
priority: 70
fail_action: warn
skills:
- skills/security-report.yaml# scripts/run-sast.py - 静态应用安全测试集成
#!/usr/bin/env python3
import json
import os
import subprocess
import sys
import tempfile
from pathlib import Path
def run_semgrep(file_path: str, rules: list) -> dict:
"""使用 Semgrep 执行 SAST 扫描"""
config = ",".join(rules)
cmd = [
"semgrep", "--config", config,
"--json", "--quiet",
"--max-target-bytes", "1000000",
file_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode not in (0, 1): # 0=no issues, 1=issues found
return {"error": result.stderr}
return json.loads(result.stdout)
def run_bandit(file_path: str) -> dict:
"""Python 专用安全扫描"""
cmd = ["bandit", "-f", "json", "-q", "-ll", file_path]
result = subprocess.run(cmd, capture_output=True, text=True)
if not result.stdout:
return {"results": []}
return json.loads(result.stdout)
def generate_report(findings: list) -> str:
if not findings:
return "PASS: No security issues found"
lines = [f"BLOCKED: Found {len(findings)} security issues"]
for f in findings:
lines.append(f" [{f['severity']}] {f['rule']}: {f['message']} at line {f.get('line', '?')}")
return "\n".join(lines)
if __name__ == "__main__":
file_path = sys.argv[1]
findings = []
# Semgrep 规则(含 OWASP Top 10 + 自定义规则)
semgrep_rules = [
"p/owasp-top-ten",
"p/cwe-top-25",
"p/security-audit",
"/opt/rules/company-security.yaml" # 自定义规则
]
semgrep_result = run_semgrep(file_path, semgrep_rules)
for r in semgrep_result.get("results", []):
findings.append({
"tool": "semgrep",
"severity": r["extra"].get("severity", "WARNING"),
"rule": r["check_id"],
"message": r["extra"].get("message", ""),
"line": r["start"]["line"]
})
# Python 文件额外用 Bandit
if file_path.endswith(".py"):
bandit_result = run_bandit(file_path)
for r in bandit_result.get("results", []):
findings.append({
"tool": "bandit",
"severity": r["issue_severity"],
"rule": r["test_id"],
"message": r["issue_text"],
"line": r["line_number"]
})
# 阻断条件:存在 ERROR 或 WARNING 级别问题
block_severities = {"ERROR", "HIGH", "MEDIUM"}
should_block = any(f["severity"] in block_severities for f in findings)
print(generate_report(findings))
sys.exit(1 if should_block else 0)# skills/security-report.yaml - 生成安全审查报告
name: security-report
version: 1.0.0
description: 汇总安全扫描结果生成报告
steps:
- name: collect-findings
run: |
cat /tmp/claude-security-findings.json | jq -s 'add' > /tmp/consolidated.json
- name: generate-html-report
run: |
python scripts/render_security_report.py \
--input /tmp/consolidated.json \
--template templates/security-report.html \
--output reports/security-report-$(date +%Y%m%d).html
- name: upload-to-sonarqube
run: |
curl -X POST "${SONAR_HOST}/api/issues/upload" \
-H "Authorization: Bearer ${SONAR_TOKEN}" \
-F "file=@/tmp/consolidated.json"
- name: notify-security-team
http:
method: POST
url: "${SLACK_SECURITY_WEBHOOK}"
body: |
{
"text": "🔒 Claude Code 安全扫描完成",
"attachments": [{
"color": "{{report.risk_color}}",
"fields": [
{"title": "高危问题", "value": "{{report.critical}}", "short": true},
{"title": "中危问题", "value": "{{report.medium}}", "short": true},
{"title": "报告", "value": "<{{report.url}}|点击查看>"}
]
}]
}