找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

630

积分

0

好友

90

主题
发表于 昨天 04:04 | 查看: 1| 回复: 0

诸如Cursor、Cline、Claude Code等现代AI助手(Agent),除了能够直接与大语言模型交互,更可通过模型上下文协议(Model Context Protocol, MCP) 来扩展其工具调用与信息处理能力。那么,如何完整地实现一个支持此类能力的Agent呢?其原理并不复杂,本文将完整演示如何构建一个具备此类能力的Agent:它能够调用大模型进行逻辑推理,同时支持通过MCP调用外部工具执行计算任务,并最终通过多种通信方式将结果推送给用户。

智能Agent系统架构图

后端开发中,我们使用langchain-go作为构建Agent的核心框架。然而,langchain-go虽然内置了工具调用支持,但并未原生适配通用的MCP格式。因此,我们需要一个适配器层,将MCP客户端的功能转换为langchain-go能够识别的工具,从而整合整个工作流。大模型调用部分,我们直接使用兼容OpenAI的API接口。

以下是实现的核心代码与步骤。

第一步:初始化MCP客户端与LLM

根据通信方式的不同,MCP客户端的初始化略有差异。

1. 初始化SSE (Server-Sent Events) 客户端

func NewAgent() *Agent {
    mcpClient, err := client.NewSSEMCPClient(
        "http://127.0.0.1:8080/sse",
    )
    if err != nil {
        log.Fatalf("Failed to create MCP client: %v", err)
    }

    llm, err := openai.New(
        openai.WithBaseURL("https:///api/xxx"),
        openai.WithToken("xxxxx"),
        openai.WithModel("xxxx"),
    )
    if err != nil {
        log.Fatalf("Create client: %v", err)
    }
    return &Agent{
        mcpClient: mcpClient,
        llm:       llm,
    }
}

2. 初始化Stdio(标准输入输出)客户端

func NewAgent() *Agent {
    // Create an MCP client using stdio
    mcpClient, err := client.NewStdioMCPClient(
        "./stdio/server/server", // Path to an MCP server executable
        nil,                     // Additional environment variables if needed
    )
    if err != nil {
        log.Fatalf("Failed to create MCP client: %v", err)
    }

    llm, err := openai.New(
        openai.WithBaseURL("https:///api/xxx"),
        openai.WithToken("xxxxx"),
        openai.WithModel("xxxx"),
    )
    if err != nil {
        log.Fatalf("Create client: %v", err)
    }
    return &Agent{
        mcpClient: mcpClient,
        llm:       llm,
    }
}

第二步:适配MCP工具并绑定至Agent

此步骤负责启动MCP客户端,通过适配器获取所有可用工具,并将其绑定到LangChain的Agent执行器上。

func (a *Agent) Init(ctx context.Context) {
    // Start the client
    if err := a.mcpClient.Start(ctx); err != nil {
        fmt.Printf("Failed to start client: %v", err)
    }
    // Create the adapter
    adapter, err := langchaingo_mcp_adapter.New(a.mcpClient)
    if err != nil {
        log.Fatalf("Failed to create adapter: %v", err)
    }
    // Get all tools from MCP server
    mcpTools, err := adapter.Tools()
    if err != nil {
        log.Fatalf("Failed to get tools: %v", err)
    }
    for _, tool := range mcpTools {
        fmt.Println("Tools: ", tool.Name(), tool.Description())
    }
    // Create a agent with the tools
    agent := agents.NewOneShotAgent(
        a.llm,
        mcpTools,
        agents.WithMaxIterations(3),
    )
    executor := agents.NewExecutor(agent)
    a.executor = executor
    fmt.Println("Agent executor initialized")
}

第三步:执行Agent任务

初始化完成后,即可通过执行器处理用户请求。

func (a *Agent) Execute(ctx context.Context, question string) string {
    // Use the agent
    result, err := chains.Run(
        ctx,
        a.executor,
        question,
    )
    if err != nil {
        log.Fatalf("Agent execution error: %v", err)
    }
    log.Printf("Agent result: %s", result)
    return result
}

至此,一个支持MCP的命令行Agent工具就已经完成了。但如果希望将其集成为网页应用或编辑器插件,则需要考虑如何将结果实时推送给前端。常见的有三种通信方式。

结果推送的三种实现方式

方式一:基于HTTP的长轮询(Streaming HTTP)

后端实现

w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
   http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
   return
}
fmt.Fprintf(w, "%s", string(res))
flusher.Flush()

前端调用示例

async function httpSend(loadingId,currentImplementation, message) {
    const response = await fetch('/api/chat', {
      method: 'POST',
      headers: {
          'Content-Type': 'text/event-stream',
          'connection': 'keep-alive',
           'cache-control': 'no-cache',
         },
       body: JSON.stringify({
           message: message,
           implementation: currentImplementation,
     }),
   });
   const data = await response.json();
       if (data.error) {
            showError(data.error);
      } else {
         addMessage('agent', "http 方式返回数据:"+data.message);
     }
}
方式二:服务器发送事件(SSE)

后端实现

w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("connection", "keep-alive")

flusher, ok := w.(http.Flusher)
if !ok {
    fmt.Println("Streaming unsupported!")
    http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
    return
}
fmt.Println("send response", string(res))
fmt.Fprintf(w, "data: %s\n\n", string(res))
flusher.Flush()

前端调用示例

eventSource = new EventSource("/api/sse?msg="+message,{ retry: 50000 });
eventSource.onmessage = function(event) {
    console.log('SSE message:', event.data);
    addMessage('agent', "sse 方式返回数据:"+event.data);
    eventSource.close();
};
方式三:WebSocket

后端连接处理

func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) {
    // Upgrade HTTP connection to WebSocket
    conn, err := s.upgrader.Upgrade(w, r, nil)
    if err != nil {
        s.logger.Printf("WebSocket upgrade failed: %v", err)
        return
    }
    // Get agent ID from query parameter
    agentID := r.URL.Query().Get("agentId")
    client := NewWSClient(conn, s.wsHub, agentID)
    s.wsHub.register <- client
    // Start read and write pumps
    go client.WritePump()
    go client.ReadPump()
    s.logger.Printf("New WebSocket connection established (agentId: %s)", agentID)
}

后端消息写入

func (c *WSClient) WritePump() {
    for {
        select {
        case message, ok := <-c.send:
            c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
            if !ok {
                // Hub closed the channel
                c.conn.WriteMessage(websocket.CloseMessage, []byte{})
                return
            }
            if err := c.conn.WriteJSON(message); err != nil {
                c.hub.logger.Printf("Error writing message: %v", err)
                return
            }

后端消息读取

func (c *WSClient) ReadPump() {
    for {
        select {
         _, message, err := c.conn.ReadMessage()
         c.send <- &WSMessage{}

前端调用示例

// 发送消息
ws.send(JSON.stringify({
            type:'message',
            content: message,
            agentId: currentImplementation,
          }));

// 接收消息
ws = new WebSocket(wsUrl);
ws.onmessage = (event) => {
    try {
        const message = JSON.parse(event.data);
        handleWebSocketMessage(message);
    } catch (error) {
        console.error('Error parsing WebSocket message:', error);
    }
};

总结

Agent开发因为引入了大语言模型这个“黑盒”节点,整体流程看似复杂,但其核心原理非常清晰。目前,以LangChain为代表的AI智能体开发工具箱已经非常成熟,封装了大多数常用功能,开发者可以专注于业务逻辑与系统架构的集成。

工具调用流程示意图
通信方式对比图




上一篇:自动驾驶事故响应机制解析:ODD边界与MRM策略下的系统行为逻辑
下一篇:美团LongCat-Image开源图像生成模型实测:6B参数下的中文渲染与精准编辑能力
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-12 08:27 , Processed in 0.081019 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表