跳转到主内容

Claude MCP 实战:让 Agent 自主操作数据库 / 文件 / API(附完整代码)

从零搭建三个 MCP Server(文件系统 / SQLite / HTTP API),接入 Claude 构建能真正干活的智能体,附可直接运行的 Python 完整代码。

开发指南开发指南预计阅读25分钟
2026.04.17 发表
Claude MCP 实战:让 Agent 自主操作数据库 / 文件 / API(附完整代码)

Claude MCP 协议深度实战:打造能操作数据库 / 文件 / API 的智能体

从零搭建三个可直接用于生产的 MCP Server,让 Claude 真正拥有「手」, 自主完成数据查询、文件读写、第三方接口调用。


什么是 MCP,为什么它改变了 Agent 开发

MCP(Model Context Protocol) 是 Anthropic 推出的开放协议, 定义了 AI 模型与外部工具 / 数据源之间的标准通信格式。

传统做法:你需要手写 function calling schema,自己解析参数,自己处理错误。 MCP 做法:启动一个 MCP Server,Claude 自动发现它暴露的工具并调用。

┌─────────────────────────────────────────────────────┐
│                    你的 Claude Agent                  │
│                                                      │
│  用户指令 → Claude 推理 → 选择工具 → 解析结果 → 回复  │
└────────────────┬──────────────────────────────────────┘
                 │ MCP 协议(stdio / SSE)
     ┌───────────┴──────────────┐
     │                          │
┌────▼────┐  ┌──────────┐  ┌───▼────┐
│ 文件系统 │  │ SQLite DB │  │外部 API │
│  Server  │  │  Server   │  │ Server  │
└─────────┘  └──────────┘  └────────┘
┌─────────────────────────────────────────────────────┐
│                    你的 Claude Agent                  │
│                                                      │
│  用户指令 → Claude 推理 → 选择工具 → 解析结果 → 回复  │
└────────────────┬──────────────────────────────────────┘
                 │ MCP 协议(stdio / SSE)
     ┌───────────┴──────────────┐
     │                          │
┌────▼────┐  ┌──────────┐  ┌───▼────┐
│ 文件系统 │  │ SQLite DB │  │外部 API │
│  Server  │  │  Server   │  │ Server  │
└─────────┘  └──────────┘  └────────┘

本文你将构建:

  • MCP Server 1:文件系统(读 / 写 / 列目录)
  • MCP Server 2:SQLite 数据库(查询 / 插入 / 更新)
  • MCP Server 3:HTTP API 网关(调用任意 REST 接口)
  • 一个统一的 Claude Agent,同时连接三个 Server

环境准备

pip install anthropic mcp httpx
pip install anthropic mcp httpx

ClaudeAPI 密钥配置:

export ANTHROPIC_API_KEY="your-claudeapi-token"
export ANTHROPIC_BASE_URL="https://gw.claudeapi.com"
export ANTHROPIC_API_KEY="your-claudeapi-token"
export ANTHROPIC_BASE_URL="https://gw.claudeapi.com"

通过 ClaudeAPI 调用,国内直连,官方 8 折定价, 支持支付宝 / 微信充值。注册地址 →


第一部分:文件系统 MCP Server

代码实现

# mcp_file_server.py
import os
import json
from pathlib import Path
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

app = Server("file-system")

WORKSPACE = Path(os.environ.get("MCP_WORKSPACE", "./workspace"))
WORKSPACE.mkdir(exist_ok=True)


@app.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="read_file",
            description="读取工作区内指定文件的内容",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {"type": "string", "description": "相对于工作区的文件路径"}
                },
                "required": ["path"],
            },
        ),
        Tool(
            name="write_file",
            description="将内容写入工作区内的文件,文件不存在则创建",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {"type": "string", "description": "相对于工作区的文件路径"},
                    "content": {"type": "string", "description": "写入内容"},
                },
                "required": ["path", "content"],
            },
        ),
        Tool(
            name="list_directory",
            description="列出工作区目录下的文件和子目录",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "相对路径,默认为根目录",
                        "default": ".",
                    }
                },
            },
        ),
    ]


@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    if name == "read_file":
        target = WORKSPACE / arguments["path"]
        if not target.exists():
            return [TextContent(type="text", text=f"错误:文件 {arguments['path']} 不存在")]
        return [TextContent(type="text", text=target.read_text(encoding="utf-8"))]

    elif name == "write_file":
        target = WORKSPACE / arguments["path"]
        target.parent.mkdir(parents=True, exist_ok=True)
        target.write_text(arguments["content"], encoding="utf-8")
        return [TextContent(type="text", text=f"✅ 已写入:{arguments['path']}")]

    elif name == "list_directory":
        target = WORKSPACE / arguments.get("path", ".")
        if not target.is_dir():
            return [TextContent(type="text", text="错误:不是有效目录")]
        entries = [
            f"{'📁' if p.is_dir() else '📄'} {p.name}"
            for p in sorted(target.iterdir())
        ]
        return [TextContent(type="text", text="\n".join(entries) or "(空目录)")]

    return [TextContent(type="text", text=f"未知工具:{name}")]


if __name__ == "__main__":
    import asyncio
    asyncio.run(stdio_server(app))
# mcp_file_server.py
import os
import json
from pathlib import Path
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

app = Server("file-system")

WORKSPACE = Path(os.environ.get("MCP_WORKSPACE", "./workspace"))
WORKSPACE.mkdir(exist_ok=True)


@app.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="read_file",
            description="读取工作区内指定文件的内容",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {"type": "string", "description": "相对于工作区的文件路径"}
                },
                "required": ["path"],
            },
        ),
        Tool(
            name="write_file",
            description="将内容写入工作区内的文件,文件不存在则创建",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {"type": "string", "description": "相对于工作区的文件路径"},
                    "content": {"type": "string", "description": "写入内容"},
                },
                "required": ["path", "content"],
            },
        ),
        Tool(
            name="list_directory",
            description="列出工作区目录下的文件和子目录",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "相对路径,默认为根目录",
                        "default": ".",
                    }
                },
            },
        ),
    ]


@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    if name == "read_file":
        target = WORKSPACE / arguments["path"]
        if not target.exists():
            return [TextContent(type="text", text=f"错误:文件 {arguments['path']} 不存在")]
        return [TextContent(type="text", text=target.read_text(encoding="utf-8"))]

    elif name == "write_file":
        target = WORKSPACE / arguments["path"]
        target.parent.mkdir(parents=True, exist_ok=True)
        target.write_text(arguments["content"], encoding="utf-8")
        return [TextContent(type="text", text=f"✅ 已写入:{arguments['path']}")]

    elif name == "list_directory":
        target = WORKSPACE / arguments.get("path", ".")
        if not target.is_dir():
            return [TextContent(type="text", text="错误:不是有效目录")]
        entries = [
            f"{'📁' if p.is_dir() else '📄'} {p.name}"
            for p in sorted(target.iterdir())
        ]
        return [TextContent(type="text", text="\n".join(entries) or "(空目录)")]

    return [TextContent(type="text", text=f"未知工具:{name}")]


if __name__ == "__main__":
    import asyncio
    asyncio.run(stdio_server(app))

第二部分:SQLite 数据库 MCP Server

# mcp_db_server.py
import sqlite3
import json
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

app = Server("sqlite-db")
DB_PATH = "data.db"


def get_conn():
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn


@app.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="execute_query",
            description="执行 SELECT 查询,返回结果集(只读)",
            inputSchema={
                "type": "object",
                "properties": {
                    "sql": {"type": "string", "description": "SELECT 语句"},
                    "params": {
                        "type": "array",
                        "items": {},
                        "description": "参数列表(防 SQL 注入)",
                        "default": [],
                    },
                },
                "required": ["sql"],
            },
        ),
        Tool(
            name="execute_write",
            description="执行 INSERT / UPDATE / DELETE,返回影响行数",
            inputSchema={
                "type": "object",
                "properties": {
                    "sql": {"type": "string", "description": "写操作 SQL 语句"},
                    "params": {"type": "array", "items": {}, "default": []},
                },
                "required": ["sql"],
            },
        ),
        Tool(
            name="list_tables",
            description="列出数据库中所有表及其结构",
            inputSchema={"type": "object", "properties": {}},
        ),
    ]


@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    params = arguments.get("params", [])

    if name == "execute_query":
        with get_conn() as conn:
            try:
                rows = conn.execute(arguments["sql"], params).fetchall()
                if not rows:
                    return [TextContent(type="text", text="查询结果为空")]
                result = [dict(row) for row in rows]
                return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False, indent=2))]
            except Exception as e:
                return [TextContent(type="text", text=f"查询错误:{e}")]

    elif name == "execute_write":
        with get_conn() as conn:
            try:
                cursor = conn.execute(arguments["sql"], params)
                conn.commit()
                return [TextContent(type="text", text=f"✅ 执行成功,影响 {cursor.rowcount} 行")]
            except Exception as e:
                return [TextContent(type="text", text=f"写入错误:{e}")]

    elif name == "list_tables":
        with get_conn() as conn:
            tables = conn.execute(
                "SELECT name FROM sqlite_master WHERE type='table'"
            ).fetchall()
            result = []
            for t in tables:
                cols = conn.execute(f"PRAGMA table_info({t['name']})").fetchall()
                col_desc = ", ".join(
                    f"{c['name']} {c['type']}" for c in cols
                )
                result.append(f"📊 {t['name']}{col_desc})")
            return [TextContent(type="text", text="\n".join(result) or "无表")]

    return [TextContent(type="text", text=f"未知工具:{name}")]


if __name__ == "__main__":
    import asyncio
    asyncio.run(stdio_server(app))
# mcp_db_server.py
import sqlite3
import json
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

app = Server("sqlite-db")
DB_PATH = "data.db"


def get_conn():
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn


@app.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="execute_query",
            description="执行 SELECT 查询,返回结果集(只读)",
            inputSchema={
                "type": "object",
                "properties": {
                    "sql": {"type": "string", "description": "SELECT 语句"},
                    "params": {
                        "type": "array",
                        "items": {},
                        "description": "参数列表(防 SQL 注入)",
                        "default": [],
                    },
                },
                "required": ["sql"],
            },
        ),
        Tool(
            name="execute_write",
            description="执行 INSERT / UPDATE / DELETE,返回影响行数",
            inputSchema={
                "type": "object",
                "properties": {
                    "sql": {"type": "string", "description": "写操作 SQL 语句"},
                    "params": {"type": "array", "items": {}, "default": []},
                },
                "required": ["sql"],
            },
        ),
        Tool(
            name="list_tables",
            description="列出数据库中所有表及其结构",
            inputSchema={"type": "object", "properties": {}},
        ),
    ]


@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    params = arguments.get("params", [])

    if name == "execute_query":
        with get_conn() as conn:
            try:
                rows = conn.execute(arguments["sql"], params).fetchall()
                if not rows:
                    return [TextContent(type="text", text="查询结果为空")]
                result = [dict(row) for row in rows]
                return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False, indent=2))]
            except Exception as e:
                return [TextContent(type="text", text=f"查询错误:{e}")]

    elif name == "execute_write":
        with get_conn() as conn:
            try:
                cursor = conn.execute(arguments["sql"], params)
                conn.commit()
                return [TextContent(type="text", text=f"✅ 执行成功,影响 {cursor.rowcount} 行")]
            except Exception as e:
                return [TextContent(type="text", text=f"写入错误:{e}")]

    elif name == "list_tables":
        with get_conn() as conn:
            tables = conn.execute(
                "SELECT name FROM sqlite_master WHERE type='table'"
            ).fetchall()
            result = []
            for t in tables:
                cols = conn.execute(f"PRAGMA table_info({t['name']})").fetchall()
                col_desc = ", ".join(
                    f"{c['name']} {c['type']}" for c in cols
                )
                result.append(f"📊 {t['name']}{col_desc})")
            return [TextContent(type="text", text="\n".join(result) or "无表")]

    return [TextContent(type="text", text=f"未知工具:{name}")]


if __name__ == "__main__":
    import asyncio
    asyncio.run(stdio_server(app))

第三部分:HTTP API MCP Server

# mcp_api_server.py
import json
import httpx
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

app = Server("http-api")


@app.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="http_get",
            description="发起 GET 请求并返回响应内容",
            inputSchema={
                "type": "object",
                "properties": {
                    "url": {"type": "string", "description": "完整的请求 URL"},
                    "headers": {
                        "type": "object",
                        "description": "请求头键值对",
                        "default": {},
                    },
                    "params": {
                        "type": "object",
                        "description": "Query 参数键值对",
                        "default": {},
                    },
                },
                "required": ["url"],
            },
        ),
        Tool(
            name="http_post",
            description="发起 POST 请求(JSON body)并返回响应",
            inputSchema={
                "type": "object",
                "properties": {
                    "url": {"type": "string", "description": "完整的请求 URL"},
                    "body": {"type": "object", "description": "请求体(JSON 格式)"},
                    "headers": {"type": "object", "default": {}},
                },
                "required": ["url", "body"],
            },
        ),
    ]


@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    async with httpx.AsyncClient(timeout=30) as client:
        try:
            if name == "http_get":
                resp = await client.get(
                    arguments["url"],
                    headers=arguments.get("headers", {}),
                    params=arguments.get("params", {}),
                )
            elif name == "http_post":
                resp = await client.post(
                    arguments["url"],
                    headers={"Content-Type": "application/json", **arguments.get("headers", {})},
                    json=arguments["body"],
                )
            else:
                return [TextContent(type="text", text=f"未知工具:{name}")]

            try:
                result = json.dumps(resp.json(), ensure_ascii=False, indent=2)
            except Exception:
                result = resp.text

            return [TextContent(
                type="text",
                text=f"状态码:{resp.status_code}\n\n{result}"
            )]

        except httpx.TimeoutException:
            return [TextContent(type="text", text="错误:请求超时")]
        except Exception as e:
            return [TextContent(type="text", text=f"请求错误:{e}")]


if __name__ == "__main__":
    import asyncio
    asyncio.run(stdio_server(app))
# mcp_api_server.py
import json
import httpx
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

app = Server("http-api")


@app.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="http_get",
            description="发起 GET 请求并返回响应内容",
            inputSchema={
                "type": "object",
                "properties": {
                    "url": {"type": "string", "description": "完整的请求 URL"},
                    "headers": {
                        "type": "object",
                        "description": "请求头键值对",
                        "default": {},
                    },
                    "params": {
                        "type": "object",
                        "description": "Query 参数键值对",
                        "default": {},
                    },
                },
                "required": ["url"],
            },
        ),
        Tool(
            name="http_post",
            description="发起 POST 请求(JSON body)并返回响应",
            inputSchema={
                "type": "object",
                "properties": {
                    "url": {"type": "string", "description": "完整的请求 URL"},
                    "body": {"type": "object", "description": "请求体(JSON 格式)"},
                    "headers": {"type": "object", "default": {}},
                },
                "required": ["url", "body"],
            },
        ),
    ]


@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    async with httpx.AsyncClient(timeout=30) as client:
        try:
            if name == "http_get":
                resp = await client.get(
                    arguments["url"],
                    headers=arguments.get("headers", {}),
                    params=arguments.get("params", {}),
                )
            elif name == "http_post":
                resp = await client.post(
                    arguments["url"],
                    headers={"Content-Type": "application/json", **arguments.get("headers", {})},
                    json=arguments["body"],
                )
            else:
                return [TextContent(type="text", text=f"未知工具:{name}")]

            try:
                result = json.dumps(resp.json(), ensure_ascii=False, indent=2)
            except Exception:
                result = resp.text

            return [TextContent(
                type="text",
                text=f"状态码:{resp.status_code}\n\n{result}"
            )]

        except httpx.TimeoutException:
            return [TextContent(type="text", text="错误:请求超时")]
        except Exception as e:
            return [TextContent(type="text", text=f"请求错误:{e}")]


if __name__ == "__main__":
    import asyncio
    asyncio.run(stdio_server(app))

第四部分:Claude Agent 统一接入三个 Server

# agent.py
import asyncio
import json
import os
from contextlib import AsyncExitStack
import anthropic
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

client = anthropic.Anthropic(
    api_key=os.environ["ANTHROPIC_API_KEY"],
    base_url=os.environ.get("ANTHROPIC_BASE_URL", "https://gw.claudeapi.com"),
)

# 定义三个 MCP Server 启动参数
SERVERS = {
    "file": StdioServerParameters(
        command="python", args=["mcp_file_server.py"]
    ),
    "db": StdioServerParameters(
        command="python", args=["mcp_db_server.py"]
    ),
    "api": StdioServerParameters(
        command="python", args=["mcp_api_server.py"]
    ),
}


async def run_agent(user_message: str):
    async with AsyncExitStack() as stack:
        sessions: dict[str, ClientSession] = {}
        all_tools = []

        # 并行连接所有 MCP Server,收集工具列表
        for server_name, params in SERVERS.items():
            read, write = await stack.enter_async_context(stdio_client(params))
            session = await stack.enter_async_context(ClientSession(read, write))
            await session.initialize()
            sessions[server_name] = session

            tools_resp = await session.list_tools()
            for tool in tools_resp.tools:
                all_tools.append({
                    "name": f"{server_name}__{tool.name}",
                    "description": f"[{server_name}] {tool.description}",
                    "input_schema": tool.inputSchema,
                })

        print(f"\n✅ 已加载 {len(all_tools)} 个工具:{[t['name'] for t in all_tools]}\n")

        # 开始 Agent 循环
        messages = [{"role": "user", "content": user_message}]
        system = (
            "你是一个拥有文件系统、数据库、HTTP API 操作能力的智能体。"
            "工具名称格式为 server__tool_name,server 前缀表示所属系统。"
            "请根据用户需求自主选择并组合使用工具,任务完成后给出清晰总结。"
        )

        while True:
            response = client.messages.create(
                model="claude-sonnet-4-6",
                max_tokens=4096,
                system=system,
                tools=all_tools,
                messages=messages,
            )

            print(f"[Claude] stop_reason: {response.stop_reason}")

            # 解析响应,处理文本和工具调用
            tool_uses = []
            for block in response.content:
                if block.type == "text":
                    print(f"\n💬 Claude:{block.text}")
                elif block.type == "tool_use":
                    tool_uses.append(block)

            if response.stop_reason == "end_turn" or not tool_uses:
                break

            # 执行所有工具调用
            messages.append({"role": "assistant", "content": response.content})
            tool_results = []

            for tool_use in tool_uses:
                server_name, tool_name = tool_use.name.split("__", 1)
                print(f"\n🔧 调用工具:{tool_use.name}")
                print(f"   参数:{json.dumps(tool_use.input, ensure_ascii=False)}")

                session = sessions[server_name]
                result = await session.call_tool(tool_name, tool_use.input)
                result_text = result.content[0].text if result.content else "(无返回)"

                print(f"   结果:{result_text[:200]}{'...' if len(result_text) > 200 else ''}")

                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": tool_use.id,
                    "content": result_text,
                })

            messages.append({"role": "user", "content": tool_results})


# 演示任务
DEMO_TASKS = [
    # 文件 + 数据库组合任务
    """
    请完成以下任务:
    1. 在数据库中创建 users 表(id INTEGER PRIMARY KEY, name TEXT, email TEXT, created_at TEXT)
    2. 插入三条测试数据
    3. 查询所有用户并将结果保存到工作区的 users_report.txt 文件
    4. 读取并确认文件内容正确
    """,

    # HTTP API + 文件任务
    """
    调用 https://api.github.com/repos/anthropics/anthropic-sdk-python/releases/latest
    获取最新版本信息,提取版本号和发布时间,
    写入工作区的 sdk_version.md 文件(Markdown 格式)。
    """,
]


if __name__ == "__main__":
    for i, task in enumerate(DEMO_TASKS, 1):
        print(f"\n{'='*60}")
        print(f"任务 {i}{task.strip()[:50]}...")
        print("=" * 60)
        asyncio.run(run_agent(task))
# agent.py
import asyncio
import json
import os
from contextlib import AsyncExitStack
import anthropic
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

client = anthropic.Anthropic(
    api_key=os.environ["ANTHROPIC_API_KEY"],
    base_url=os.environ.get("ANTHROPIC_BASE_URL", "https://gw.claudeapi.com"),
)

# 定义三个 MCP Server 启动参数
SERVERS = {
    "file": StdioServerParameters(
        command="python", args=["mcp_file_server.py"]
    ),
    "db": StdioServerParameters(
        command="python", args=["mcp_db_server.py"]
    ),
    "api": StdioServerParameters(
        command="python", args=["mcp_api_server.py"]
    ),
}


async def run_agent(user_message: str):
    async with AsyncExitStack() as stack:
        sessions: dict[str, ClientSession] = {}
        all_tools = []

        # 并行连接所有 MCP Server,收集工具列表
        for server_name, params in SERVERS.items():
            read, write = await stack.enter_async_context(stdio_client(params))
            session = await stack.enter_async_context(ClientSession(read, write))
            await session.initialize()
            sessions[server_name] = session

            tools_resp = await session.list_tools()
            for tool in tools_resp.tools:
                all_tools.append({
                    "name": f"{server_name}__{tool.name}",
                    "description": f"[{server_name}] {tool.description}",
                    "input_schema": tool.inputSchema,
                })

        print(f"\n✅ 已加载 {len(all_tools)} 个工具:{[t['name'] for t in all_tools]}\n")

        # 开始 Agent 循环
        messages = [{"role": "user", "content": user_message}]
        system = (
            "你是一个拥有文件系统、数据库、HTTP API 操作能力的智能体。"
            "工具名称格式为 server__tool_name,server 前缀表示所属系统。"
            "请根据用户需求自主选择并组合使用工具,任务完成后给出清晰总结。"
        )

        while True:
            response = client.messages.create(
                model="claude-sonnet-4-6",
                max_tokens=4096,
                system=system,
                tools=all_tools,
                messages=messages,
            )

            print(f"[Claude] stop_reason: {response.stop_reason}")

            # 解析响应,处理文本和工具调用
            tool_uses = []
            for block in response.content:
                if block.type == "text":
                    print(f"\n💬 Claude:{block.text}")
                elif block.type == "tool_use":
                    tool_uses.append(block)

            if response.stop_reason == "end_turn" or not tool_uses:
                break

            # 执行所有工具调用
            messages.append({"role": "assistant", "content": response.content})
            tool_results = []

            for tool_use in tool_uses:
                server_name, tool_name = tool_use.name.split("__", 1)
                print(f"\n🔧 调用工具:{tool_use.name}")
                print(f"   参数:{json.dumps(tool_use.input, ensure_ascii=False)}")

                session = sessions[server_name]
                result = await session.call_tool(tool_name, tool_use.input)
                result_text = result.content[0].text if result.content else "(无返回)"

                print(f"   结果:{result_text[:200]}{'...' if len(result_text) > 200 else ''}")

                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": tool_use.id,
                    "content": result_text,
                })

            messages.append({"role": "user", "content": tool_results})


# 演示任务
DEMO_TASKS = [
    # 文件 + 数据库组合任务
    """
    请完成以下任务:
    1. 在数据库中创建 users 表(id INTEGER PRIMARY KEY, name TEXT, email TEXT, created_at TEXT)
    2. 插入三条测试数据
    3. 查询所有用户并将结果保存到工作区的 users_report.txt 文件
    4. 读取并确认文件内容正确
    """,

    # HTTP API + 文件任务
    """
    调用 https://api.github.com/repos/anthropics/anthropic-sdk-python/releases/latest
    获取最新版本信息,提取版本号和发布时间,
    写入工作区的 sdk_version.md 文件(Markdown 格式)。
    """,
]


if __name__ == "__main__":
    for i, task in enumerate(DEMO_TASKS, 1):
        print(f"\n{'='*60}")
        print(f"任务 {i}{task.strip()[:50]}...")
        print("=" * 60)
        asyncio.run(run_agent(task))

运行效果示例

✅ 已加载 7 个工具:[
  'file__read_file', 'file__write_file', 'file__list_directory',
  'db__execute_query', 'db__execute_write', 'db__list_tables',
  'api__http_get', 'api__http_post'
]

[Claude] stop_reason: tool_use

🔧 调用工具:db__execute_write
   参数:{"sql": "CREATE TABLE IF NOT EXISTS users ..."}
   结果:✅ 执行成功,影响 0 行

🔧 调用工具:db__execute_write
   参数:{"sql": "INSERT INTO users VALUES ...", "params": [...]}
   结果:✅ 执行成功,影响 3 行

🔧 调用工具:db__execute_query
   参数:{"sql": "SELECT * FROM users"}
   结果:[{"id": 1, "name": "张三", ...}, ...]

🔧 调用工具:file__write_file
   参数:{"path": "users_report.txt", "content": "..."}
   结果:✅ 已写入:users_report.txt

💬 Claude:已完成所有任务。数据库中成功创建 users 表并插入 3 条记录,
查询结果已保存至 workspace/users_report.txt,文件内容经读取确认无误。
✅ 已加载 7 个工具:[
  'file__read_file', 'file__write_file', 'file__list_directory',
  'db__execute_query', 'db__execute_write', 'db__list_tables',
  'api__http_get', 'api__http_post'
]

[Claude] stop_reason: tool_use

🔧 调用工具:db__execute_write
   参数:{"sql": "CREATE TABLE IF NOT EXISTS users ..."}
   结果:✅ 执行成功,影响 0 行

🔧 调用工具:db__execute_write
   参数:{"sql": "INSERT INTO users VALUES ...", "params": [...]}
   结果:✅ 执行成功,影响 3 行

🔧 调用工具:db__execute_query
   参数:{"sql": "SELECT * FROM users"}
   结果:[{"id": 1, "name": "张三", ...}, ...]

🔧 调用工具:file__write_file
   参数:{"path": "users_report.txt", "content": "..."}
   结果:✅ 已写入:users_report.txt

💬 Claude:已完成所有任务。数据库中成功创建 users 表并插入 3 条记录,
查询结果已保存至 workspace/users_report.txt,文件内容经读取确认无误。

生产环境最佳实践

1. 工具权限分层

# 在 Server 中加入权限控制
READONLY_TOOLS = {"read_file", "list_directory", "execute_query"}
WRITE_TOOLS = {"write_file", "execute_write"}

# 生产环境可按需暴露不同工具集
# 在 Server 中加入权限控制
READONLY_TOOLS = {"read_file", "list_directory", "execute_query"}
WRITE_TOOLS = {"write_file", "execute_write"}

# 生产环境可按需暴露不同工具集

2. 使用 xhigh 努力级别处理复杂任务

response = client.messages.create(
    model="claude-opus-4-7",       # 复杂多步任务用 Opus 4.7
    max_tokens=8192,
    thinking={"type": "enabled", "budget_tokens": 4000},
    ...
)
response = client.messages.create(
    model="claude-opus-4-7",       # 复杂多步任务用 Opus 4.7
    max_tokens=8192,
    thinking={"type": "enabled", "budget_tokens": 4000},
    ...
)

3. 错误重试机制

import tenacity

@tenacity.retry(
    wait=tenacity.wait_exponential(min=1, max=10),
    stop=tenacity.stop_after_attempt(3),
)
async def call_tool_with_retry(session, tool_name, inputs):
    return await session.call_tool(tool_name, inputs)
import tenacity

@tenacity.retry(
    wait=tenacity.wait_exponential(min=1, max=10),
    stop=tenacity.stop_after_attempt(3),
)
async def call_tool_with_retry(session, tool_name, inputs):
    return await session.call_tool(tool_name, inputs)

4. MCP Server 注册表(适合多人协作)

# servers_registry.py
REGISTRY = {
    "file":     {"command": "python", "args": ["mcp_file_server.py"]},
    "db":       {"command": "python", "args": ["mcp_db_server.py"]},
    "api":      {"command": "python", "args": ["mcp_api_server.py"]},
    "postgres": {"command": "npx", "args": ["@modelcontextprotocol/server-postgres"]},
    "github":   {"command": "npx", "args": ["@modelcontextprotocol/server-github"]},
}
# servers_registry.py
REGISTRY = {
    "file":     {"command": "python", "args": ["mcp_file_server.py"]},
    "db":       {"command": "python", "args": ["mcp_db_server.py"]},
    "api":      {"command": "python", "args": ["mcp_api_server.py"]},
    "postgres": {"command": "npx", "args": ["@modelcontextprotocol/server-postgres"]},
    "github":   {"command": "npx", "args": ["@modelcontextprotocol/server-github"]},
}

常见问题

Q:MCP 和普通 function calling 有什么区别?

function calling 需要你在每次请求时手动传入工具定义,MCP 将工具定义维护在独立 Server 中,Claude 自动发现并调用,更适合工具数量多、复用场景多的 Agent。

Q:MCP Server 崩溃了怎么办?

在 Agent 循环外包一层重连逻辑,或使用 supervisor 进程管理(如 pm2supervisord)保持 Server 常驻。

Q:如何限制 Claude 只能访问特定目录 / 表?

在 MCP Server 内做路径白名单或表名过滤,Claude 感知不到底层限制,只会使用 Server 暴露的能力范围。


总结

组件 作用
mcp_file_server.py 文件读写 / 目录浏览
mcp_db_server.py SQLite 查询 / 写入
mcp_api_server.py 任意 HTTP REST 调用
agent.py 统一接入,Claude 自主规划工具调用链

MCP 让 Agent 从「聊天机器人」变成「能干活的工程师」。 结合 ClaudeAPI 的稳定接入和 Opus 4.7 的自主任务能力, 你可以将这套架构直接用于生产级的自动化工作流。


开始构建

注册 ClaudeAPI →

支持支付宝 / 微信充值,官方 8 折,国内直连。

相关文章