写在前面:为什么你必须认真了解 MCP?
过去一年,大模型真正的瓶颈已经不在「推理能力」,而在于:
- ❌ 无法访问实时数据
- ❌ 无法安全调用企业内部系统
- ❌ 无法被工程化治理
MCP(Model Context Protocol)正是为了解决这些问题而生。
MCP 不是一个 SDK
它是 「大模型与真实世界之间的协议层」
本文将带你从 0 到 1 实现一个 MCP Server,并深入拆解它背后的设计思想。这是一个写给后端、架构以及致力于人工智能应用开发的工程师的深度实战指南。
一、MCP核心概念
1.1 什么是MCP?
MCP是一个开放协议,允许大模型安全地与外部工具和数据源交互,从而有效解决模型知识受限于训练数据的固有问题,比如无法获取最新信息或操作特定业务系统。
1.2 MCP架构
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ LLM Client │───▶│ MCP Server │───▶│ 外部工具/数据源 │
│ (如 Claude.app) │◀───│ (我们实现的) │◀───│ (如数据库/API) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
二、动手实现MCP Server
2.1 环境搭建
# 创建项目目录
mkdir mcp-server-tutorial
cd mcp-server-tutorial
# 初始化Python环境
python -m venv venv
source venv/bin/activate # Linux/Mac
# venv\Scripts\activate # Windows
# 安装依赖
pip install pydantic jsonschema
2.2 基础MCP Server实现
server.py
#!/usr/bin/env python3
"""
MCP Server 基础实现
理解MCP协议的核心原理
"""
import json
import sys
import asyncio
from typing import Dict, List, Any, Optional
from enum import Enum
from dataclasses import dataclass
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MessageType(Enum):
"""MCP消息类型"""
REQUEST = "request"
RESPONSE = "response"
NOTIFICATION = "notification"
@dataclass
class McpMessage:
"""MCP消息基类"""
jsonrpc: str = "2.0"
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
raise NotImplementedError
class RequestMessage(McpMessage):
"""请求消息"""
def __init__(self, method: str, params: Dict[str, Any] = None, id: Any = None):
self.method = method
self.params = params or {}
self.id = id
def to_dict(self) -> Dict[str, Any]:
return {
"jsonrpc": self.jsonrpc,
"method": self.method,
"params": self.params,
"id": self.id
}
class ResponseMessage(McpMessage):
"""响应消息"""
def __init__(self, id: Any, result: Any = None, error: Dict[str, Any] = None):
self.id = id
self.result = result
self.error = error
def to_dict(self) -> Dict[str, Any]:
response = {
"jsonrpc": self.jsonrpc,
"id": self.id
}
if self.error:
response["error"] = self.error
else:
response["result"] = self.result
return response
class Tool:
"""MCP工具定义"""
def __init__(self, name: str, description: str, input_schema: Dict[str, Any]):
self.name = name
self.description = description
self.input_schema = input_schema
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"description": self.description,
"inputSchema": self.input_schema
}
class BaseMcpServer:
"""基础MCP Server实现"""
def __init__(self, name: str, version: str):
self.name = name
self.version = version
self.tools: Dict[str, Tool] = {}
self.handlers: Dict[str, callable] = {
"initialize": self._handle_initialize,
"tools/list": self._handle_tools_list,
"tools/call": self._handle_tools_call,
"ping": self._handle_ping
}
def register_tool(self, tool: Tool) -> None:
"""注册工具"""
self.tools[tool.name] = tool
def register_handler(self, method: str, handler: callable) -> None:
"""注册自定义处理器"""
self.handlers[method] = handler
async def handle_message(self, message_str: str) -> str:
"""处理传入的MCP消息"""
try:
message_data = json.loads(message_str)
message_type = self._get_message_type(message_data)
if message_type == MessageType.REQUEST:
return await self._handle_request(message_data)
else:
logger.warning(f"未处理的消息类型: {message_type}")
return ""
except json.JSONDecodeError:
error_response = ResponseMessage(
id=None,
error={"code": -32700, "message": "Parse error"}
)
return json.dumps(error_response.to_dict())
def _get_message_type(self, message: Dict[str, Any]) -> MessageType:
"""判断消息类型"""
if "method" in message:
return MessageType.REQUEST
elif "result" in message or "error" in message:
return MessageType.RESPONSE
else:
return MessageType.NOTIFICATION
async def _handle_request(self, request_data: Dict[str, Any]) -> str:
"""处理请求"""
method = request_data.get("method")
request_id = request_data.get("id")
if method in self.handlers:
try:
params = request_data.get("params", {})
result = await self.handlers[method](params)
if request_id is not None:
response = ResponseMessage(id=request_id, result=result)
return json.dumps(response.to_dict())
except Exception as e:
logger.error(f"处理请求 {method} 时出错: {e}")
error_response = ResponseMessage(
id=request_id,
error={"code": -32603, "message": str(e)}
)
return json.dumps(error_response.to_dict())
else:
error_response = ResponseMessage(
id=request_id,
error={"code": -32601, "message": f"Method not found: {method}"}
)
return json.dumps(error_response.to_dict())
async def _handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""处理初始化请求"""
logger.info("收到初始化请求")
return {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": self.name,
"version": self.version
}
}
async def _handle_tools_list(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""返回工具列表"""
logger.info("收到工具列表请求")
return {
"tools": [tool.to_dict() for tool in self.tools.values()]
}
async def _handle_tools_call(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""调用工具"""
tool_name = params.get("name")
arguments = params.get("arguments", {})
logger.info(f"调用工具: {tool_name}, 参数: {arguments}")
if tool_name in self.tools:
# 在实际应用中,这里会调用具体的工具逻辑
return {
"content": [
{
"type": "text",
"text": f"工具 {tool_name} 被调用,参数: {arguments}"
}
]
}
else:
raise ValueError(f"工具未找到: {tool_name}")
async def _handle_ping(self, params: Dict[str, Any]) -> str:
"""处理ping请求"""
return "pong"
class CalculatorTool(Tool):
"""计算器工具"""
def __init__(self):
super().__init__(
name="calculator",
description="执行数学计算",
input_schema={
"type": "object",
"properties": {
"operation": {
"type": "string",
"enum": ["add", "subtract", "multiply", "divide"],
"description": "计算操作"
},
"a": {
"type": "number",
"description": "第一个数字"
},
"b": {
"type": "number",
"description": "第二个数字"
}
},
"required": ["operation", "a", "b"]
}
)
async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""执行计算"""
operation = arguments.get("operation")
a = arguments.get("a")
b = arguments.get("b")
if operation == "add":
result = a + b
elif operation == "subtract":
result = a - b
elif operation == "multiply":
result = a * b
elif operation == "divide":
if b == 0:
raise ValueError("除数不能为零")
result = a / b
else:
raise ValueError(f"未知操作: {operation}")
return {
"content": [{
"type": "text",
"text": f"计算结果: {result}"
}]
}
class WebSearchTool(Tool):
"""网页搜索工具"""
def __init__(self):
super().__init__(
name="web_search",
description="搜索网页信息",
input_schema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索查询"
},
"limit": {
"type": "integer",
"description": "结果数量限制",
"default": 5
}
},
"required": ["query"]
}
)
async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""执行搜索(模拟)"""
query = arguments.get("query")
limit = arguments.get("limit", 5)
# 模拟搜索结果
results = [
f"结果 {i+1}: 关于 '{query}' 的信息"
for i in range(min(limit, 5))
]
return {
"content": [{
"type": "text",
"text": f"搜索 '{query}' 的结果:\n" + "\n".join(results)
}]
}
class FileSystemTool(Tool):
"""文件系统工具"""
def __init__(self):
super().__init__(
name="read_file",
description="读取文件内容",
input_schema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "文件路径"
}
},
"required": ["path"]
}
)
async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""读取文件(模拟)"""
path = arguments.get("path")
# 模拟读取文件
content = f"这是文件 {path} 的模拟内容。\n" \
"在实际应用中,这里会读取真实文件。"
return {
"content": [{
"type": "text",
"text": content
}]
}
async def main():
"""主函数 - 运行MCP Server"""
# 创建MCP Server实例
server = BaseMcpServer(
name="TutorialMcpServer",
version="1.0.0"
)
# 注册工具
calculator = CalculatorTool()
web_search = WebSearchTool()
file_system = FileSystemTool()
server.register_tool(calculator)
server.register_tool(web_search)
server.register_tool(file_system)
# 注册工具执行处理器
async def handle_tool_call(params: Dict[str, Any]) -> Dict[str, Any]:
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name == "calculator":
return await calculator.execute(arguments)
elif tool_name == "web_search":
return await web_search.execute(arguments)
elif tool_name == "read_file":
return await file_system.execute(arguments)
else:
raise ValueError(f"未知工具: {tool_name}")
server.register_handler("tools/call", handle_tool_call)
# 模拟与客户端的交互
print("MCP Server 已启动,等待连接...")
print("输入 'exit' 退出")
print("-" * 50)
# 模拟客户端请求
test_requests = [
# 初始化请求
json.dumps({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"clientInfo": {
"name": "TestClient",
"version": "1.0"
}
}
}),
# 获取工具列表
json.dumps({
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list",
"params": {}
}),
# 调用计算器工具
json.dumps({
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": {
"name": "calculator",
"arguments": {
"operation": "add",
"a": 10,
"b": 5
}
}
}),
# 调用搜索工具
json.dumps({
"jsonrpc": "2.0",
"id": 4,
"method": "tools/call",
"params": {
"name": "web_search",
"arguments": {
"query": "MCP协议是什么",
"limit": 3
}
}
})
]
# 处理测试请求
for request in test_requests:
print(f"\n客户端请求: {request}")
response = await server.handle_message(request)
print(f"服务器响应: {response}")
await asyncio.sleep(1)
# 交互模式
while True:
try:
user_input = input("\n输入JSON-RPC请求 (或输入 'exit'): ").strip()
if user_input.lower() == 'exit':
break
if user_input:
response = await server.handle_message(user_input)
print(f"响应: {response}")
except KeyboardInterrupt:
break
except Exception as e:
print(f"错误: {e}")
if __name__ == "__main__":
asyncio.run(main())
2.3 使用标准MCP库的实现
server_advanced.py
#!/usr/bin/env python3
"""
使用官方MCP库的高级实现
"""
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client import stdio
import json
class AdvancedMcpServer:
"""高级MCP Server实现"""
def __init__(self):
self.tools = []
async def list_tools(self):
"""列出所有可用工具"""
return [
{
"name": "get_weather",
"description": "获取天气信息",
"inputSchema": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "城市名称"},
"date": {"type": "string", "description": "日期"}
},
"required": ["city"]
}
},
{
"name": "get_stock_price",
"description": "获取股票价格",
"inputSchema": {
"type": "object",
"properties": {
"symbol": {"type": "string", "description": "股票代码"},
"period": {"type": "string", "enum": ["1d", "1w", "1m"], "default": "1d"}
},
"required": ["symbol"]
}
}
]
async def call_tool(self, name: str, arguments: dict):
"""调用工具"""
if name == "get_weather":
city = arguments.get("city", "未知城市")
return {
"content": [{
"type": "text",
"text": f"{city}的天气:晴朗,25°C"
}]
}
elif name == "get_stock_price":
symbol = arguments.get("symbol", "AAPL")
return {
"content": [{
"type": "text",
"text": f"{symbol}当前价格:$150.25"
}]
}
else:
raise ValueError(f"未知工具: {name}")
async def run_stdio_server():
"""运行标准IO服务器"""
server = AdvancedMcpServer()
# 创建服务器参数
server_params = StdioServerParameters(
command="python",
args=["-c", "print('MCP Server Ready')"]
)
async with stdio.stdio_server(server_params) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
# 初始化
await session.initialize()
print("MCP Server已初始化")
# 列出工具
tools = await server.list_tools()
print(f"可用工具: {json.dumps(tools, indent=2, ensure_ascii=False)}")
# 保持运行
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("服务器关闭")
if __name__ == "__main__":
asyncio.run(run_stdio_server())
三、MCP协议深度解析
3.1 协议消息流
# 协议消息示例
messages = {
# 1. 初始化
"initialize": {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"clientInfo": {"name": "Claude", "version": "1.0"}
}
},
# 2. 初始化响应
"initialize_response": {
"jsonrpc": "2.0",
"id": 1,
"result": {
"protocolVersion": "2024-11-05",
"serverInfo": {"name": "MyServer", "version": "1.0"},
"capabilities": {
"tools": {},
"resources": {},
"prompts": {}
}
}
},
# 3. 工具调用
"tool_call": {
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {
"name": "calculator",
"arguments": {"operation": "add", "a": 10, "b": 5}
}
}
}
3.2 传输层实现
transport.py
"""
MCP传输层实现
支持Stdio和SSE两种传输方式
"""
import asyncio
import json
import sys
from typing import AsyncGenerator
class McpTransport:
"""MCP传输抽象基类"""
async def read_message(self) -> str:
"""读取消息"""
raise NotImplementedError
async def write_message(self, message: str) -> None:
"""写入消息"""
raise NotImplementedError
class StdioTransport(McpTransport):
"""标准输入输出传输"""
def __init__(self):
self.reader = asyncio.StreamReader()
self.writer = None
loop = asyncio.get_event_loop()
# 包装标准输入输出
loop.add_reader(sys.stdin.fileno(), self._stdin_ready)
def _stdin_ready(self):
"""标准输入就绪回调"""
data = sys.stdin.buffer.read1(1024)
if data:
self.reader.feed_data(data)
async def read_message(self) -> str:
"""从标准输入读取消息"""
data = await self.reader.readuntil(b'\n')
return data.decode('utf-8').strip()
async def write_message(self, message: str) -> None:
"""写入到标准输出"""
sys.stdout.write(message + '\n')
sys.stdout.flush()
class SseTransport(McpTransport):
"""Server-Sent Events传输"""
def __init__(self):
self.queue = asyncio.Queue()
async def read_message(self) -> str:
"""从SSE流读取消息"""
return await self.queue.get()
async def write_message(self, message: str) -> None:
"""写入SSE流"""
# 在实际实现中,这里会通过HTTP响应发送SSE事件
print(f"SSE Event: {message}")
3.3 资源与提示(Resources & Prompts)
class Resource:
"""MCP资源定义"""
def __init__(self, uri: str, name: str, description: str, mime_type: str = "text/plain"):
self.uri = uri
self.name = name
self.description = description
self.mime_type = mime_type
def to_dict(self) -> dict:
return {
"uri": self.uri,
"name": self.name,
"description": self.description,
"mimeType": self.mime_type
}
class Prompt:
"""MCP提示定义"""
def __init__(self, name: str, description: str, arguments: list = None):
self.name = name
self.description = description
self.arguments = arguments or []
def to_dict(self) -> dict:
return {
"name": self.name,
"description": self.description,
"arguments": self.arguments
}
四、实战:集成真实工具
4.1 数据库工具
import sqlite3
from contextlib import contextmanager
class DatabaseTool(Tool):
"""数据库查询工具"""
def __init__(self, db_path: str):
super().__init__(
name="query_database",
description="执行SQL查询",
input_schema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "SQL查询语句"},
"parameters": {"type": "object", "description": "查询参数"}
},
"required": ["query"]
}
)
self.db_path = db_path
@contextmanager
def get_connection(self):
"""获取数据库连接"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
async def execute(self, arguments: dict) -> dict:
"""执行SQL查询"""
query = arguments.get("query")
params = arguments.get("parameters", {})
with self.get_connection() as conn:
cursor = conn.cursor()
if isinstance(params, dict):
cursor.execute(query, params)
else:
cursor.execute(query)
results = cursor.fetchall()
columns = [description[0] for description in cursor.description]
# 格式化结果
formatted_results = []
for row in results:
formatted_results.append(dict(zip(columns, row)))
return {
"content": [{
"type": "text",
"text": json.dumps(formatted_results, indent=2, ensure_ascii=False)
}]
}
4.2 API工具
import aiohttp
from typing import Dict, Any
class ApiTool(Tool):
"""API调用工具"""
def __init__(self, api_config: Dict[str, Any]):
super().__init__(
name="call_api",
description="调用外部API",
input_schema={
"type": "object",
"properties": {
"endpoint": {"type": "string", "description": "API端点"},
"method": {"type": "string", "enum": ["GET", "POST", "PUT", "DELETE"], "default": "GET"},
"params": {"type": "object", "description": "请求参数"},
"headers": {"type": "object", "description": "请求头"},
"body": {"type": "object", "description": "请求体"}
},
"required": ["endpoint"]
}
)
self.api_config = api_config
async def execute(self, arguments: dict) -> dict:
"""执行API调用"""
endpoint = arguments.get("endpoint")
method = arguments.get("method", "GET")
params = arguments.get("params", {})
headers = arguments.get("headers", {})
body = arguments.get("body")
url = f"{self.api_config['base_url']}{endpoint}"
async with aiohttp.ClientSession() as session:
async with session.request(
method=method,
url=url,
params=params,
headers={**self.api_config.get('default_headers', {}), **headers},
json=body
) as response:
result = await response.json()
return {
"content": [{
"type": "text",
"text": json.dumps(result, indent=2, ensure_ascii=False)
}]
}
五、测试与调试
5.1 测试脚本
test_mcp.py
#!/usr/bin/env python3
"""
MCP Server测试脚本
"""
import asyncio
import json
from server import BaseMcpServer, CalculatorTool
async def test_server():
"""测试MCP Server"""
# 创建服务器
server = BaseMcpServer("TestServer", "1.0.0")
server.register_tool(CalculatorTool())
# 测试用例
test_cases = [
{
"name": "初始化测试",
"request": {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {"protocolVersion": "2024-11-05"}
}
},
{
"name": "工具列表测试",
"request": {
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list",
"params": {}
}
},
{
"name": "工具调用测试",
"request": {
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": {
"name": "calculator",
"arguments": {
"operation": "multiply",
"a": 7,
"b": 8
}
}
}
}
]
print("开始测试MCP Server...")
print("=" * 50)
for test_case in test_cases:
print(f"\n测试: {test_case['name']}")
print(f"请求: {json.dumps(test_case['request'], indent=2)}")
response = await server.handle_message(json.dumps(test_case['request']))
print(f"响应: {response}")
await asyncio.sleep(0.5)
print("\n测试完成!")
if __name__ == "__main__":
asyncio.run(test_server())
5.2 与Claude Desktop集成
claude_desktop_config.json
{
"mcpServers": {
"tutorial-server": {
"command": "python",
"args": ["/path/to/your/server.py"],
"env": {
"PYTHONPATH": "/path/to/your/project"
}
}
}
}
六、最佳实践与性能优化
6.1 错误处理
class McpError(Exception):
"""MCP错误基类"""
def __init__(self, code: int, message: str, data: Any = None):
self.code = code
self.message = message
self.data = data
super().__init__(f"MCP Error {code}: {message}")
class ToolNotFoundError(McpError):
"""工具未找到错误"""
def __init__(self, tool_name: str):
super().__init__(
code=-32601,
message=f"Tool not found: {tool_name}",
data={"tool_name": tool_name}
)
class ValidationError(McpError):
"""参数验证错误"""
def __init__(self, field: str, reason: str):
super().__init__(
code=-32602,
message=f"Invalid parameter: {field}",
data={"field": field, "reason": reason}
)
6.2 性能优化建议
- 连接池管理:对数据库和API连接使用连接池
- 异步处理:确保所有I/O操作都是异步的
- 缓存策略:对频繁访问的数据实现缓存
- 超时控制:设置合理的请求超时时间
- 资源限制:限制并发请求数量
MCP 的真实运行模型(很多教程没讲清楚)
前面你已经会“写一个 MCP Server”,但真正用起来时,模型是如何决定“要不要调用工具”的?
这是 MCP 最容易被误解的地方。
MCP ≠ Function Calling
关键认知纠正一句话:
MCP Server 不“控制模型”,它只“暴露能力”
模型是否调用工具,完全取决于:
- 工具 Schema 是否足够清晰
- 工具描述是否“可被模型理解”
- 当前上下文是否“触发了工具使用动机”
MCP 决策链路(真实)
📌 重点
- MCP Server 永远不直接回答用户
- MCP Server 只返回 中间能力结果
- 最后一句话永远是模型自己生成的
为什么工具 Schema 会“直接决定成功率”
一个非常真实的坑 👇
❌ 差的工具描述(模型不爱用)
{
"name": "query_database",
"description": "执行SQL查询"
}
✅ 好的工具描述(模型会主动用)
{
"name": "query_database",
"description": "当用户询问数据、统计、列表、排行、趋势时,使用该工具执行只读 SQL 查询并返回结果",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "只读 SQL,不允许 INSERT/UPDATE/DELETE"
}
}
}
}
💡 经验法则
Schema 是“写给模型看的 API 文档”,不是写给人看的
生产级 MCP Server 必须补齐的 6 个能力
你现在的实现是 教学级 / Demo 级,上线前至少要补这 6 件事。
8.1 工具权限与隔离(非常重要)
问题
MCP Server 本质是 “给模型开后门”
必须做的限制
class ToolContext:
def __init__(self, user_id: str, role: str):
self.user_id = user_id
self.role = role
async def execute(self, arguments, context: ToolContext):
if context.role != "admin":
raise PermissionError("无权访问该工具")
📌 真实企业里:
- 一个 MCP Server 往往 按业务域拆
- 不同模型 / 不同用户 → 不同 MCP Server
8.2 防止 LLM 乱执行(SQL / 文件 / API)
SQL 防护(你文章可以直接补)
def validate_sql(sql: str):
forbidden = ["INSERT", "UPDATE", "DELETE", "DROP", "ALTER"]
for kw in forbidden:
if kw.lower() in sql.lower():
raise ValidationError("query", "Only SELECT is allowed")
文件防护
import os
BASE_DIR = "/data/read_only"
real_path = os.path.realpath(path)
if not real_path.startswith(BASE_DIR):
raise PermissionError("非法路径访问")
8.3 超时 & 资源熔断(模型会“卡死你”)
async def safe_execute(coro, timeout=5):
try:
return await asyncio.wait_for(coro, timeout)
except asyncio.TimeoutError:
raise McpError(-32000, "Tool execution timeout")
📌 生产建议
- 单个工具调用 ≤ 5s
- 单次对话工具调用 ≤ 3 次
- 并发 MCP Server 实例 ≥ 模型 QPS
8.4 MCP Server 的推荐部署方式
❌ 不推荐
✅ 推荐
MCP Server = 辅助能力侧车(Sidecar)
考虑独立的进程部署和清晰的后端架构边界。
| 维度 |
MCP |
LangChain Tool |
OpenAI Function |
| 协议 |
开放 |
框架私有 |
平台私有 |
| 传输 |
stdio / SSE |
内存调用 |
HTTP |
| 多模型 |
✅ |
⚠️ |
❌ |
| 企业可控 |
✅ |
⚠️ |
❌ |
| 工具治理 |
强 |
中 |
弱 |
📌 一句结论
MCP 是“模型工具层的基础设施”,不是 SDK
8.6 一个真实落地场景示例
智能运维 Agent
- 用户:
“查一下昨晚 3 点到 4 点错误最多的服务”
- MCP 工具链:
query_logs
aggregate_error_count
query_service_owner
- MCP Server 返回:
{
"content": [{
"type": "text",
"text": "service-a 在 03:00–04:00 期间错误最多(523 次)"
}]
}
👉 最终解释、总结、建议
仍然由模型完成。
学习资源
- 官方文档:https://modelcontextprotocol.io
- GitHub仓库:https://github.com/modelcontextprotocol
- 示例项目:https://github.com/modelcontextprotocol/servers
- 规范文档:https://spec.modelcontextprotocol.io
总结
通过这个实战教程,你已经:
- ✅ 理解了MCP的核心概念和架构
- ✅ 实现了一个基础的MCP Server
- ✅ 掌握了MCP协议的消息格式
- ✅ 学会了如何创建和注册工具
- ✅ 了解了如何与真实工具集成
- ✅ 学会了测试和调试MCP Server
MCP是一个强大的协议,它让大模型的能力可以无限扩展。掌握MCP开发,你就能为AI应用创造无限可能!如果你想与更多开发者交流此类实践,欢迎访问云栈社区的相关板块。