找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1352

积分

0

好友

189

主题
发表于 6 天前 | 查看: 19| 回复: 0

Model Context Protocol (MCP) 正在成为AI应用连接外部工具的通用标准。尽管官方SDK主要面向TypeScript和Python生态,但在众多企业级后端系统中,Java(特别是Spring Boot)依然是核心支柱。

本文将带你脱离现成框架,从零开始使用原生Spring Boot代码构建一个MCP Server,并实现与之交互的客户端。文章将重点攻克一个高阶实战挑战:在基于WebClient的Flux流式响应场景中,如何精准拦截大语言模型(LLM)发出的工具调用(Tool Call)指令,执行MCP工具后,优雅地实现递归对话,同时保持对前端的流式输出

一、效果演示

为了清晰区分来自MCP服务器的调用结果,我们在演示中让加法工具的结果额外增加100。

效果演示图

二、MCP服务器端实现

以下是一个简化的MCP服务器控制器,它提供了SSE连接建立和JSON-RPC消息处理端点。

package com.chugyoyo.cosmosagent.mcp.controller;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import com.chugyoyo.cosmosagent.mcp.model.JsonRpcRequest;
import com.chugyoyo.cosmosagent.mcp.model.JsonRpcResponse;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@RestController
@RequestMapping("/mcp")
public class McpServerController {

    // 模拟会话存储
    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();

    // [端点 1] SSE连接建立
    @GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter connect() {
        SseEmitter emitter = new SseEmitter(30 * 60 * 1000L); // 30分钟超时
        String sessionId = "session-" + System.currentTimeMillis();
        emitters.put(sessionId, emitter);
        // MCP规范:连接后发送endpoint事件,通知客户端可发送POST请求
        try {
            emitter.send(SseEmitter.event().name("endpoint").data("/mcp/messages?sessionId=" + sessionId));
            System.out.println("Client connected: " + sessionId);
        } catch (IOException e) {
            emitters.remove(sessionId);
        }
        return emitter;
    }

    // [端点 2] 处理客户端消息 (JSON-RPC)
    @PostMapping("/messages")
    public JsonRpcResponse handleMessage(@RequestParam(required = false) String sessionId,
                                         @RequestBody JsonRpcRequest request) {
        System.out.println("Received Method: " + request.getMethod());
        // 路由逻辑
        switch (request.getMethod()) {
            case "initialize":
                // 握手阶段
                Map<String, Object> initResult = new HashMap<>();
                initResult.put("protocolVersion", "2024-11-05");
                initResult.put("serverInfo", Map.of("name", "MyJavaMcpServer", "version", "1.0"));
                initResult.put("capabilities", Map.of("tools", new HashMap<>()));
                return new JsonRpcResponse("2.0", initResult, null, request.getId());
            case "tools/list":
                // 向客户端宣告可用工具列表
                Map<String, Object> tool = new HashMap<>();
                tool.put("name", "add_numbers");
                tool.put("description", "Add two numbers together");
                // 此处简化了inputSchema定义,仅作演示
                return new JsonRpcResponse("2.0", Map.of("tools", java.util.List.of(tool)), null, request.getId());
            case "tools/call":
                // 执行具体的工具逻辑
                return executeTool(request);
            default:
                return new JsonRpcResponse("2.0", null, Map.of("code", -32601, "message", "Method not found"), request.getId());
        }
    }

    // 工具执行逻辑
    private JsonRpcResponse executeTool(JsonRpcRequest request) {
        // 为简化演示直接强转Map,生产环境建议使用Jackson ObjectMapper进行类型安全转换
        Map<String, Object> params = (Map<String, Object>) request.getParams();
        String toolName = (String) params.get("name");
        Map<String, Integer> args = JSON.parseObject(params.get("arguments").toString(), new TypeReference<Map<String, Integer>>() {});
        if ("add_numbers".equals(toolName)) {
            int result = args.get("a") + args.get("b") + 100; // 演示标识:结果额外+100
            // MCP工具返回的标准结构
            Map<String, Object> content = Map.of("type", "text", "text", String.valueOf(result));
            return new JsonRpcResponse("2.0", Map.of("content", java.util.List.of(content)), null, request.getId());
        }
        return new JsonRpcResponse("2.0", null, "Tool not found", request.getId());
    }
}

三、MCP客户端服务

客户端服务负责与MCP服务器通信,包括握手、工具发现、调用,以及将MCP工具格式转换为OpenAI兼容格式。

package com.chugyoyo.cosmosagent.mcp.service;

import com.alibaba.fastjson2.JSONObject;
import com.chugyoyo.cosmosagent.mcp.model.JsonRpcRequest;
import com.chugyoyo.cosmosagent.mcp.model.JsonRpcResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestClient;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
@Service
public class McpClientService {
    private final RestClient restClient;
    private final String SERVER_URL = "http://localhost:8080/mcp";

    public McpClientService(RestClient.Builder builder) {
        this.restClient = builder.baseUrl(SERVER_URL).build();
    }

    public void runDemo() {
        System.out.println("--- Starting MCP Client Demo ---");
        // 1. 发送Initialize握手
        JsonRpcRequest initReq = new JsonRpcRequest("2.0", "initialize",
                Map.of("clientInfo", Map.of("name", "JavaClient", "version", "1.0")), 1);
        JsonRpcResponse initResp = sendPost(initReq);
        System.out.println("[Init] Server Protocol Version: " +
                ((Map) initResp.getResult()).get("protocolVersion"));

        // 2. 查询可用工具
        JsonRpcRequest listReq = new JsonRpcRequest("2.0", "tools/list", null, 2);
        JsonRpcResponse listResp = sendPost(listReq);
        System.out.println("[List] Available Tools: " + listResp.getResult());

        // 3. 调用工具
        Map<String, Object> callParams = new HashMap<>();
        callParams.put("name", "add_numbers");
        callParams.put("arguments", Map.of("a", 10, "b", 25));
        JsonRpcRequest callReq = new JsonRpcRequest("2.0", "tools/call", callParams, 3);
        JsonRpcResponse callResp = sendPost(callReq);
        System.out.println("[Call] Result: " + callResp.getResult());
    }

    private JsonRpcResponse sendPost(JsonRpcRequest request) {
        log.info("Sending POST request to {} with payload: {}", SERVER_URL, request);
        return restClient.post()
                .uri("/messages")
                .contentType(org.springframework.http.MediaType.APPLICATION_JSON)
                .body(request)
                .retrieve()
                .body(JsonRpcResponse.class);
    }

    // 获取MCP工具并转换为OpenAI格式
    public List<Map<String, Object>> fetchAndConvertMcpTools() {
        JsonRpcRequest listReq = new JsonRpcRequest("2.0", "tools/list", null, 1);
        JsonRpcResponse resp = sendPost(listReq);
        Map result = (Map) resp.getResult();
        List<Map> mcpTools = (List<Map>) result.get("tools");
        List<Map<String, Object>> openAiTools = new ArrayList<>();
        for (Map mcpTool : mcpTools) {
            Map<String, Object> func = new HashMap<>();
            func.put("name", mcpTool.get("name"));
            func.put("description", mcpTool.get("description"));
            // 简化转换,MCP的inputSchema通常可兼容作为parameters
            Map<String, Object> parameters = new HashMap<>();
            parameters.put("type", "object");
            parameters.put("properties", Map.of(
                    "a", Map.of("type", "integer"),
                    "b", Map.of("type", "integer")
            ));
            func.put("parameters", parameters);
            openAiTools.add(Map.of(
                    "type", "function",
                    "function", func
            ));
        }
        return openAiTools;
    }

    public String executeMcpTool(String toolName, String argsJson) {
        Map<String, Object> callParams = new HashMap<>();
        callParams.put("name", toolName);
        callParams.put("arguments", argsJson);
        JsonRpcRequest callReq = new JsonRpcRequest("2.0", "tools/call", callParams, 3);
        JsonRpcResponse callResp = sendPost(callReq);
        System.out.println("[Call] Result: " + callResp.getResult());
        Object result = callResp.getResult();
        JSONObject resultJson = JSONObject.parseObject(JSONObject.toJSONString(result));
        return resultJson.getJSONArray("content").getJSONObject(0).getString("text");
    }
}

四、整合Chat聊天与MCP工具调用

4.1 控制器入口

提供流式聊天接口。

@PostMapping(value = "/sendMessageStream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> sendMessageStream(@Valid @RequestBody ChatRequest request) {
    return chatService.sendMessageStreamV2(request);
}
4.2 服务层:会话管理与消息处理

负责管理聊天会话、保存消息,并调用AI服务。

@Override
public Flux<String> sendMessageStreamV2(ChatRequest request) {
    try {
        // 获取或创建会话
        ChatSessionDTO session;
        if (request.getSessionId() != null) {
            session = chatSessionService.getSessionById(request.getSessionId());
            if (session == null) {
                return Flux.error(new RuntimeException("会话不存在"));
            }
        } else {
            session = createOrGetSession(request.getAgentId(), "默认会话 " + LocalDateTime.now());
        }
        // 保存用户消息
        ChatMessageDTO userMessage = new ChatMessageDTO();
        userMessage.setSessionId(session.getId());
        userMessage.setRole("user");
        userMessage.setContent(request.getMessage());
        chatMessageService.createMessage(userMessage);
        // 从MCP获取工具列表
        List<Map<String, Object>> tools = mcpClientService.fetchAndConvertMcpTools();
        // 准备历史消息
        List<ChatMessageDTO> messages = chatMessageService.getMessagesBySessionId(request.getSessionId());
        StringBuilder aiResponseBuilder = new StringBuilder();
        List<Map<String, Object>> collect = messages.stream().map(
                msg -> Map.of(
                        "role", msg.getRole(),
                        "content", (Object) msg.getContent()
                )
        ).collect(Collectors.toList());
        // 调用智谱AI流式服务
        return zhipuaiService.streamChat(collect, tools)
                .doOnNext(content -> {
                    log.info("收到AI回复片段: {}", content);
                    aiResponseBuilder.append(content);
                })
                .doOnComplete(() -> {
                    // 流式响应完成,保存完整的AI回复
                    String fullResponse = aiResponseBuilder.toString();
                    if (!fullResponse.isEmpty()) {
                        ChatMessageDTO aiMessage = new ChatMessageDTO();
                        aiMessage.setSessionId(session.getId());
                        aiMessage.setRole("assistant");
                        aiMessage.setContent(fullResponse);
                        chatMessageService.createMessage(aiMessage);
                        log.info("AI回复完成并保存,会话ID: {}, 内容长度: {}", session.getId(), fullResponse.length());
                    }
                })
                .onErrorResume(e -> {
                    log.error("AI服务调用失败", e);
                    // 出错时保存错误信息
                    String errorMessage = "抱歉,AI服务暂时不可用,请稍后再试。";
                    ChatMessageDTO errorMessageDto = new ChatMessageDTO();
                    errorMessageDto.setSessionId(session.getId());
                    errorMessageDto.setRole("assistant");
                    errorMessageDto.setContent(errorMessage);
                    chatMessageService.createMessage(errorMessageDto);
                    return Flux.just(errorMessage);
                });
    } catch (Exception e) {
        log.error("处理流式消息失败", e);
        return Flux.error(new RuntimeException("处理消息失败: " + e.getMessage()));
    }
}
4.3 核心难点:Flux流式处理中的工具调用与递归

这是本文要解决的关键问题:在保持前端流式接收的同时,处理LLM中途发起的工具调用,并在执行工具后递归地继续对话。

@Override
public Flux<String> streamChat(List<Map<String, Object>> messages, List<Map<String, Object>> tools) {
    try {
        // 1. 获取AI服务配置
        AIConfigurationDTO config = configurationService.getConfigurationByProvider("zhipuai");
        if (config == null || !StringUtils.hasText(config.getApiKey())) {
            return Flux.error(new RuntimeException("智谱AI配置未找到或API Key未配置"));
        }
        String baseUrl = StringUtils.hasText(config.getBaseUrl()) ? config.getBaseUrl() : "https://open.bigmodel.cn/api/paas/v4";
        String model = StringUtils.hasText(config.getModel()) ? config.getModel() : "glm-4";

        // 2. 构建请求体
        Map<String, Object> requestBody = new HashMap<>();
        requestBody.put("model", model);
        requestBody.put("stream", true);
        requestBody.put("temperature", 0.7);
        requestBody.put("max_tokens", 2048);
        requestBody.put("tools", tools);
        requestBody.put("messages", messages);

        // 3. 创建WebClient
        WebClient webClient = WebClient.builder()
                .baseUrl(baseUrl)
                .defaultHeader("Authorization", "Bearer " + config.getApiKey())
                .defaultHeader("Content-Type", "application/json")
                .build();
        log.info("开始调用智谱AI流式聊天,模型: {}", model);

        // 4. 创建上下文对象,用于跨流片段累积状态
        StreamContext context = new StreamContext();
        AtomicReference<StreamContext> contextRef = new AtomicReference<>(context);

        // 5. 发起请求并处理原始流
        Flux<String> currentStream = webClient.post()
                .uri("/chat/completions")
                .contentType(MediaType.APPLICATION_JSON)
                .accept(MediaType.TEXT_EVENT_STREAM)
                .bodyValue(requestBody)
                .retrieve()
                .bodyToFlux(String.class)
                .timeout(Duration.ofSeconds(60))
                .filter(line -> !line.trim().isEmpty() && !line.equals("[DONE]"))
                .map(jsonStr -> {
                    try {
                        JsonNode root = objectMapper.readTree(jsonStr);
                        JsonNode choices = root.get("choices");
                        if (choices == null || choices.isEmpty()) return "";
                        JsonNode delta = choices.get(0).get("delta");

                        // A. 处理普通文本内容
                        if (delta.has("content") && !delta.get("content").isNull()) {
                            String text = delta.get("content").asText();
                            contextRef.get().fullContent.append(text); // 记录完整回复
                            return text; // 返回给前端显示
                        }
                        // B. 处理工具调用 (Tool Calls)
                        if (delta.has("tool_calls")) {
                            contextRef.get().isToolCall = true;
                            JsonNode toolCall = delta.get("tool_calls").get(0); // 假设一次流式传输一个工具
                            if (toolCall.has("id")) {
                                contextRef.get().toolId = toolCall.get("id").asText();
                            }
                            if (toolCall.has("function")) {
                                JsonNode func = toolCall.get("function");
                                if (func.has("name")) {
                                    contextRef.get().toolName = func.get("name").asText();
                                }
                                if (func.has("arguments")) {
                                    contextRef.get().toolArgsBuffer.append(func.get("arguments").asText());
                                }
                            }
                        }
                        return ""; // 工具调用片段不作为文本输出
                    } catch (Exception e) {
                        log.error("解析流数据块错误", e);
                        return "";
                    }
                });

        // 6. 关键优化:使用concatWith处理流结束后的逻辑(工具执行 + 递归)
        return currentStream.concatWith(Flux.defer(() -> {
            StreamContext ctx = contextRef.get();
            // 判断本轮流是否结束并检测到完整的工具调用
            if (ctx.hasCompleteToolCall()) {
                log.info("检测到工具调用: {},参数: {}", ctx.toolName, ctx.toolArgsBuffer);
                // 6.1 执行MCP工具
                String toolResult = mcpClientService.executeMcpTool(ctx.toolName, ctx.toolArgsBuffer.toString());
                // 6.2 更新对话历史
                List<Map<String, Object>> nextMessages = new ArrayList<>(messages);
                // (1) 添加Assistant的Tool Call消息
                Map<String, Object> assistantMsg = new HashMap<>();
                assistantMsg.put("role", "assistant");
                assistantMsg.put("content", null);
                Map<String, Object> toolCallObj = new HashMap<>();
                toolCallObj.put("id", ctx.toolId != null ? ctx.toolId : "call_" + UUID.randomUUID());
                toolCallObj.put("type", "function");
                toolCallObj.put("function", Map.of(
                        "name", ctx.toolName,
                        "arguments", ctx.toolArgsBuffer.toString()
                ));
                assistantMsg.put("tool_calls", List.of(toolCallObj));
                nextMessages.add(assistantMsg);
                // (2) 添加Tool的执行结果消息
                Map<String, Object> toolMsg = new HashMap<>();
                toolMsg.put("role", "tool");
                toolMsg.put("tool_call_id", toolCallObj.get("id"));
                toolMsg.put("content", toolResult);
                nextMessages.add(toolMsg);

                // 6.3 递归调用!将工具结果发回给LLM,获取最终解释
                return streamChat(nextMessages, tools);
            }
            // 如果没有工具调用,直接结束流
            return Flux.empty();
        }));
    } catch (Exception e) {
        log.error("初始化智谱AI流式服务失败", e);
        return Flux.error(new RuntimeException("初始化AI服务失败: " + e.getMessage()));
    }
}

// 内部类:用于在流处理过程中保持状态
@Data
private static class StreamContext {
    StringBuilder fullContent = new StringBuilder(); // 累积普通文本
    StringBuilder toolArgsBuffer = new StringBuilder(); // 累积工具参数(流式传输可能分片)
    String toolName = null;
    String toolId = null;
    boolean isToolCall = false;

    // 判断是否已完成一个工具调用的信息收集
    public boolean hasCompleteToolCall() {
        return isToolCall && toolName != null && toolArgsBuffer.length() > 0;
    }
}

核心设计思路:利用Flux.concatWith操作符,在主数据流结束后,检查上下文状态。如果检测到完整的工具调用,则执行MCP工具,更新对话历史,并递归调用streamChat方法开启新一轮的流式交互。这样就实现了在保持前端连接不中断的前提下,完成“LLM请求工具 -> 执行工具 -> LLM回复结果”的完整递归流程,并始终保持流式输出。




上一篇:STM32/LPC1778串口FIFO应用指南:自定义协议实现高效数据收发与中断优化
下一篇:LangChain核心能力解析:构建企业级大模型应用与面试必备技能
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-24 21:11 , Processed in 0.226019 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表