Model Context Protocol (MCP) 正在成为AI应用连接外部工具的通用标准。尽管官方SDK主要面向TypeScript和Python生态,但在众多企业级后端系统中,Java(特别是Spring Boot)依然是核心支柱。
本文将带你脱离现成框架,从零开始使用原生Spring Boot代码构建一个MCP Server,并实现与之交互的客户端。文章将重点攻克一个高阶实战挑战:在基于WebClient的Flux流式响应场景中,如何精准拦截大语言模型(LLM)发出的工具调用(Tool Call)指令,执行MCP工具后,优雅地实现递归对话,同时保持对前端的流式输出。
一、效果演示
为了清晰区分来自MCP服务器的调用结果,我们在演示中让加法工具的结果额外增加100。

二、MCP服务器端实现
以下是一个简化的MCP服务器控制器,它提供了SSE连接建立和JSON-RPC消息处理端点。
package com.chugyoyo.cosmosagent.mcp.controller;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import com.chugyoyo.cosmosagent.mcp.model.JsonRpcRequest;
import com.chugyoyo.cosmosagent.mcp.model.JsonRpcResponse;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@RestController
@RequestMapping("/mcp")
public class McpServerController {
// 模拟会话存储
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
// [端点 1] SSE连接建立
@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter connect() {
SseEmitter emitter = new SseEmitter(30 * 60 * 1000L); // 30分钟超时
String sessionId = "session-" + System.currentTimeMillis();
emitters.put(sessionId, emitter);
// MCP规范:连接后发送endpoint事件,通知客户端可发送POST请求
try {
emitter.send(SseEmitter.event().name("endpoint").data("/mcp/messages?sessionId=" + sessionId));
System.out.println("Client connected: " + sessionId);
} catch (IOException e) {
emitters.remove(sessionId);
}
return emitter;
}
// [端点 2] 处理客户端消息 (JSON-RPC)
@PostMapping("/messages")
public JsonRpcResponse handleMessage(@RequestParam(required = false) String sessionId,
@RequestBody JsonRpcRequest request) {
System.out.println("Received Method: " + request.getMethod());
// 路由逻辑
switch (request.getMethod()) {
case "initialize":
// 握手阶段
Map<String, Object> initResult = new HashMap<>();
initResult.put("protocolVersion", "2024-11-05");
initResult.put("serverInfo", Map.of("name", "MyJavaMcpServer", "version", "1.0"));
initResult.put("capabilities", Map.of("tools", new HashMap<>()));
return new JsonRpcResponse("2.0", initResult, null, request.getId());
case "tools/list":
// 向客户端宣告可用工具列表
Map<String, Object> tool = new HashMap<>();
tool.put("name", "add_numbers");
tool.put("description", "Add two numbers together");
// 此处简化了inputSchema定义,仅作演示
return new JsonRpcResponse("2.0", Map.of("tools", java.util.List.of(tool)), null, request.getId());
case "tools/call":
// 执行具体的工具逻辑
return executeTool(request);
default:
return new JsonRpcResponse("2.0", null, Map.of("code", -32601, "message", "Method not found"), request.getId());
}
}
// 工具执行逻辑
private JsonRpcResponse executeTool(JsonRpcRequest request) {
// 为简化演示直接强转Map,生产环境建议使用Jackson ObjectMapper进行类型安全转换
Map<String, Object> params = (Map<String, Object>) request.getParams();
String toolName = (String) params.get("name");
Map<String, Integer> args = JSON.parseObject(params.get("arguments").toString(), new TypeReference<Map<String, Integer>>() {});
if ("add_numbers".equals(toolName)) {
int result = args.get("a") + args.get("b") + 100; // 演示标识:结果额外+100
// MCP工具返回的标准结构
Map<String, Object> content = Map.of("type", "text", "text", String.valueOf(result));
return new JsonRpcResponse("2.0", Map.of("content", java.util.List.of(content)), null, request.getId());
}
return new JsonRpcResponse("2.0", null, "Tool not found", request.getId());
}
}
三、MCP客户端服务
客户端服务负责与MCP服务器通信,包括握手、工具发现、调用,以及将MCP工具格式转换为OpenAI兼容格式。
package com.chugyoyo.cosmosagent.mcp.service;
import com.alibaba.fastjson2.JSONObject;
import com.chugyoyo.cosmosagent.mcp.model.JsonRpcRequest;
import com.chugyoyo.cosmosagent.mcp.model.JsonRpcResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestClient;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Service
public class McpClientService {
private final RestClient restClient;
private final String SERVER_URL = "http://localhost:8080/mcp";
public McpClientService(RestClient.Builder builder) {
this.restClient = builder.baseUrl(SERVER_URL).build();
}
public void runDemo() {
System.out.println("--- Starting MCP Client Demo ---");
// 1. 发送Initialize握手
JsonRpcRequest initReq = new JsonRpcRequest("2.0", "initialize",
Map.of("clientInfo", Map.of("name", "JavaClient", "version", "1.0")), 1);
JsonRpcResponse initResp = sendPost(initReq);
System.out.println("[Init] Server Protocol Version: " +
((Map) initResp.getResult()).get("protocolVersion"));
// 2. 查询可用工具
JsonRpcRequest listReq = new JsonRpcRequest("2.0", "tools/list", null, 2);
JsonRpcResponse listResp = sendPost(listReq);
System.out.println("[List] Available Tools: " + listResp.getResult());
// 3. 调用工具
Map<String, Object> callParams = new HashMap<>();
callParams.put("name", "add_numbers");
callParams.put("arguments", Map.of("a", 10, "b", 25));
JsonRpcRequest callReq = new JsonRpcRequest("2.0", "tools/call", callParams, 3);
JsonRpcResponse callResp = sendPost(callReq);
System.out.println("[Call] Result: " + callResp.getResult());
}
private JsonRpcResponse sendPost(JsonRpcRequest request) {
log.info("Sending POST request to {} with payload: {}", SERVER_URL, request);
return restClient.post()
.uri("/messages")
.contentType(org.springframework.http.MediaType.APPLICATION_JSON)
.body(request)
.retrieve()
.body(JsonRpcResponse.class);
}
// 获取MCP工具并转换为OpenAI格式
public List<Map<String, Object>> fetchAndConvertMcpTools() {
JsonRpcRequest listReq = new JsonRpcRequest("2.0", "tools/list", null, 1);
JsonRpcResponse resp = sendPost(listReq);
Map result = (Map) resp.getResult();
List<Map> mcpTools = (List<Map>) result.get("tools");
List<Map<String, Object>> openAiTools = new ArrayList<>();
for (Map mcpTool : mcpTools) {
Map<String, Object> func = new HashMap<>();
func.put("name", mcpTool.get("name"));
func.put("description", mcpTool.get("description"));
// 简化转换,MCP的inputSchema通常可兼容作为parameters
Map<String, Object> parameters = new HashMap<>();
parameters.put("type", "object");
parameters.put("properties", Map.of(
"a", Map.of("type", "integer"),
"b", Map.of("type", "integer")
));
func.put("parameters", parameters);
openAiTools.add(Map.of(
"type", "function",
"function", func
));
}
return openAiTools;
}
public String executeMcpTool(String toolName, String argsJson) {
Map<String, Object> callParams = new HashMap<>();
callParams.put("name", toolName);
callParams.put("arguments", argsJson);
JsonRpcRequest callReq = new JsonRpcRequest("2.0", "tools/call", callParams, 3);
JsonRpcResponse callResp = sendPost(callReq);
System.out.println("[Call] Result: " + callResp.getResult());
Object result = callResp.getResult();
JSONObject resultJson = JSONObject.parseObject(JSONObject.toJSONString(result));
return resultJson.getJSONArray("content").getJSONObject(0).getString("text");
}
}
四、整合Chat聊天与MCP工具调用
4.1 控制器入口
提供流式聊天接口。
@PostMapping(value = "/sendMessageStream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> sendMessageStream(@Valid @RequestBody ChatRequest request) {
return chatService.sendMessageStreamV2(request);
}
4.2 服务层:会话管理与消息处理
负责管理聊天会话、保存消息,并调用AI服务。
@Override
public Flux<String> sendMessageStreamV2(ChatRequest request) {
try {
// 获取或创建会话
ChatSessionDTO session;
if (request.getSessionId() != null) {
session = chatSessionService.getSessionById(request.getSessionId());
if (session == null) {
return Flux.error(new RuntimeException("会话不存在"));
}
} else {
session = createOrGetSession(request.getAgentId(), "默认会话 " + LocalDateTime.now());
}
// 保存用户消息
ChatMessageDTO userMessage = new ChatMessageDTO();
userMessage.setSessionId(session.getId());
userMessage.setRole("user");
userMessage.setContent(request.getMessage());
chatMessageService.createMessage(userMessage);
// 从MCP获取工具列表
List<Map<String, Object>> tools = mcpClientService.fetchAndConvertMcpTools();
// 准备历史消息
List<ChatMessageDTO> messages = chatMessageService.getMessagesBySessionId(request.getSessionId());
StringBuilder aiResponseBuilder = new StringBuilder();
List<Map<String, Object>> collect = messages.stream().map(
msg -> Map.of(
"role", msg.getRole(),
"content", (Object) msg.getContent()
)
).collect(Collectors.toList());
// 调用智谱AI流式服务
return zhipuaiService.streamChat(collect, tools)
.doOnNext(content -> {
log.info("收到AI回复片段: {}", content);
aiResponseBuilder.append(content);
})
.doOnComplete(() -> {
// 流式响应完成,保存完整的AI回复
String fullResponse = aiResponseBuilder.toString();
if (!fullResponse.isEmpty()) {
ChatMessageDTO aiMessage = new ChatMessageDTO();
aiMessage.setSessionId(session.getId());
aiMessage.setRole("assistant");
aiMessage.setContent(fullResponse);
chatMessageService.createMessage(aiMessage);
log.info("AI回复完成并保存,会话ID: {}, 内容长度: {}", session.getId(), fullResponse.length());
}
})
.onErrorResume(e -> {
log.error("AI服务调用失败", e);
// 出错时保存错误信息
String errorMessage = "抱歉,AI服务暂时不可用,请稍后再试。";
ChatMessageDTO errorMessageDto = new ChatMessageDTO();
errorMessageDto.setSessionId(session.getId());
errorMessageDto.setRole("assistant");
errorMessageDto.setContent(errorMessage);
chatMessageService.createMessage(errorMessageDto);
return Flux.just(errorMessage);
});
} catch (Exception e) {
log.error("处理流式消息失败", e);
return Flux.error(new RuntimeException("处理消息失败: " + e.getMessage()));
}
}
4.3 核心难点:Flux流式处理中的工具调用与递归
这是本文要解决的关键问题:在保持前端流式接收的同时,处理LLM中途发起的工具调用,并在执行工具后递归地继续对话。
@Override
public Flux<String> streamChat(List<Map<String, Object>> messages, List<Map<String, Object>> tools) {
try {
// 1. 获取AI服务配置
AIConfigurationDTO config = configurationService.getConfigurationByProvider("zhipuai");
if (config == null || !StringUtils.hasText(config.getApiKey())) {
return Flux.error(new RuntimeException("智谱AI配置未找到或API Key未配置"));
}
String baseUrl = StringUtils.hasText(config.getBaseUrl()) ? config.getBaseUrl() : "https://open.bigmodel.cn/api/paas/v4";
String model = StringUtils.hasText(config.getModel()) ? config.getModel() : "glm-4";
// 2. 构建请求体
Map<String, Object> requestBody = new HashMap<>();
requestBody.put("model", model);
requestBody.put("stream", true);
requestBody.put("temperature", 0.7);
requestBody.put("max_tokens", 2048);
requestBody.put("tools", tools);
requestBody.put("messages", messages);
// 3. 创建WebClient
WebClient webClient = WebClient.builder()
.baseUrl(baseUrl)
.defaultHeader("Authorization", "Bearer " + config.getApiKey())
.defaultHeader("Content-Type", "application/json")
.build();
log.info("开始调用智谱AI流式聊天,模型: {}", model);
// 4. 创建上下文对象,用于跨流片段累积状态
StreamContext context = new StreamContext();
AtomicReference<StreamContext> contextRef = new AtomicReference<>(context);
// 5. 发起请求并处理原始流
Flux<String> currentStream = webClient.post()
.uri("/chat/completions")
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.TEXT_EVENT_STREAM)
.bodyValue(requestBody)
.retrieve()
.bodyToFlux(String.class)
.timeout(Duration.ofSeconds(60))
.filter(line -> !line.trim().isEmpty() && !line.equals("[DONE]"))
.map(jsonStr -> {
try {
JsonNode root = objectMapper.readTree(jsonStr);
JsonNode choices = root.get("choices");
if (choices == null || choices.isEmpty()) return "";
JsonNode delta = choices.get(0).get("delta");
// A. 处理普通文本内容
if (delta.has("content") && !delta.get("content").isNull()) {
String text = delta.get("content").asText();
contextRef.get().fullContent.append(text); // 记录完整回复
return text; // 返回给前端显示
}
// B. 处理工具调用 (Tool Calls)
if (delta.has("tool_calls")) {
contextRef.get().isToolCall = true;
JsonNode toolCall = delta.get("tool_calls").get(0); // 假设一次流式传输一个工具
if (toolCall.has("id")) {
contextRef.get().toolId = toolCall.get("id").asText();
}
if (toolCall.has("function")) {
JsonNode func = toolCall.get("function");
if (func.has("name")) {
contextRef.get().toolName = func.get("name").asText();
}
if (func.has("arguments")) {
contextRef.get().toolArgsBuffer.append(func.get("arguments").asText());
}
}
}
return ""; // 工具调用片段不作为文本输出
} catch (Exception e) {
log.error("解析流数据块错误", e);
return "";
}
});
// 6. 关键优化:使用concatWith处理流结束后的逻辑(工具执行 + 递归)
return currentStream.concatWith(Flux.defer(() -> {
StreamContext ctx = contextRef.get();
// 判断本轮流是否结束并检测到完整的工具调用
if (ctx.hasCompleteToolCall()) {
log.info("检测到工具调用: {},参数: {}", ctx.toolName, ctx.toolArgsBuffer);
// 6.1 执行MCP工具
String toolResult = mcpClientService.executeMcpTool(ctx.toolName, ctx.toolArgsBuffer.toString());
// 6.2 更新对话历史
List<Map<String, Object>> nextMessages = new ArrayList<>(messages);
// (1) 添加Assistant的Tool Call消息
Map<String, Object> assistantMsg = new HashMap<>();
assistantMsg.put("role", "assistant");
assistantMsg.put("content", null);
Map<String, Object> toolCallObj = new HashMap<>();
toolCallObj.put("id", ctx.toolId != null ? ctx.toolId : "call_" + UUID.randomUUID());
toolCallObj.put("type", "function");
toolCallObj.put("function", Map.of(
"name", ctx.toolName,
"arguments", ctx.toolArgsBuffer.toString()
));
assistantMsg.put("tool_calls", List.of(toolCallObj));
nextMessages.add(assistantMsg);
// (2) 添加Tool的执行结果消息
Map<String, Object> toolMsg = new HashMap<>();
toolMsg.put("role", "tool");
toolMsg.put("tool_call_id", toolCallObj.get("id"));
toolMsg.put("content", toolResult);
nextMessages.add(toolMsg);
// 6.3 递归调用!将工具结果发回给LLM,获取最终解释
return streamChat(nextMessages, tools);
}
// 如果没有工具调用,直接结束流
return Flux.empty();
}));
} catch (Exception e) {
log.error("初始化智谱AI流式服务失败", e);
return Flux.error(new RuntimeException("初始化AI服务失败: " + e.getMessage()));
}
}
// 内部类:用于在流处理过程中保持状态
@Data
private static class StreamContext {
StringBuilder fullContent = new StringBuilder(); // 累积普通文本
StringBuilder toolArgsBuffer = new StringBuilder(); // 累积工具参数(流式传输可能分片)
String toolName = null;
String toolId = null;
boolean isToolCall = false;
// 判断是否已完成一个工具调用的信息收集
public boolean hasCompleteToolCall() {
return isToolCall && toolName != null && toolArgsBuffer.length() > 0;
}
}
核心设计思路:利用Flux.concatWith操作符,在主数据流结束后,检查上下文状态。如果检测到完整的工具调用,则执行MCP工具,更新对话历史,并递归调用streamChat方法开启新一轮的流式交互。这样就实现了在保持前端连接不中断的前提下,完成“LLM请求工具 -> 执行工具 -> LLM回复结果”的完整递归流程,并始终保持流式输出。