一、删除会话接口开发实战
在开发对话型应用的后台时,会话管理是核心功能之一。用户需要能够清理或删除不再需要的对话记录。本文将以一个实际的开发任务为例,记录如何基于现有代码结构,快速实现一个删除会话的接口。
1. 分析与定位原始接口
接到需求后,首先要做的是找到系统内已存在的类似接口,理解其设计逻辑,以便快速复用或参考。
a. 接口地址定位
通过检查代码或网络请求,我们定位到原始的删除会话接口地址为:
http://localhost:9222/v1/conversation/rm
该接口是一个POST请求,需要传递特定格式的参数。
b. 源码逻辑分析
查看ragflow项目中的相关源码,我们发现接口的核心处理逻辑如下:
@manager.route('/rm', methods=['POST'])
@login_required
def rm():
conv_ids = request.json["conversation_ids"]
try:
for cid in conv_ids:
exist, conv = ConversationService.get_by_id(cid)
if not exist:
return get_data_error_result(retmsg="Conversation not found!")
tenants = UserTenantService.query(user_id=current_user.id)
for tenant in tenants:
if DialogService.query(tenant_id=tenant.tenant_id, id=conv.dialog_id):
break
else:
return get_json_result(
data=False, retmsg=f'Only owner of conversation authorized for this operation.',
retcode=RetCode.OPERATING_ERROR)
ConversationService.delete_by_id(cid)
return get_json_result(data=True)
except Exception as e:
return server_error_response(e)
这段代码的核心逻辑是:接收一个会话ID列表,依次检查每个会话是否存在,并验证当前用户是否有权限操作该会话所属的对话(Dialog),验证通过后执行删除操作。
2. 在新服务中实现接口
理解了原始逻辑后,我们需要在自己的chat_serve服务中实现一个功能相似的接口。由于新服务的认证和上下文可能不同,我们需要进行一些适配。
a. 服务端代码实现
在自己的 chat_api.py 中,我们实现新的路由 /rm_conversation。

上图左侧展示了使用API调试工具(如Postman或Apifox)调用该接口的过程。我们向 {host}:8789/api/rm_conversation 发起POST请求,Body中传入JSON格式的 conversation_ids 和 dialog_id。
右侧对应的Python核心实现代码如下:
@router.post('/rm_conversation')
async def set_conversation(request:Request):
req = await request.body()
req = json.loads(req.decode("utf-8"))
conv_ids = []
if 'conversation_ids' in req and req['conversation_ids']:
conv_ids = req['conversation_ids']
try:
for cid in conv_ids:
exist, conv = ConversationService.get_by_id(cid)
if not exist:
return error(retmsg="Conversation not found!")
# 权限校验逻辑(此处根据业务需要调整或注释)
# tenants = UserTenantService.query(user_id=current_user.id)
# if DialogService.query(tenant_idtenant.tenant_id, id=conv.dialog_id):
# break
# else:
# return get_json_result(
# data=False,
# retmsg="Only owner of conversation authorized for this operation.",
# retcode=RetCode.OPERATING_ERROR)
ConversationService.delete_by_id(cid)
return ok(data=True)
except Exception as e:
return error(e)
关键点说明:
- 请求解析:通过
request.body() 获取原始字节数据,然后解码并解析为JSON。
- 参数处理:安全地获取
conversation_ids 参数。
- 循环删除:遍历ID列表,逐一检查并删除。
- 权限校验:原始代码中的多租户权限校验逻辑,在新服务中可根据实际情况保留、修改或暂时注释掉。
- 异常处理:用try-except包裹核心逻辑,确保服务稳定性。
b. 接口测试
调用接口后,服务端返回了预期的成功响应,表明删除操作执行成功。
{
"code": 0,
"data": true,
"msg": "success"
}
二、WebSocket客户端消息收发实战
除了常规的HTTP REST API,实时通信是对话系统的另一个关键需求。接下来,我们实现一个WebSocket服务端,并创建一个网页客户端与之进行双向通信。
1. 建立基础的WebSocket通信
目标是先打通一个最简流程:客户端发送文本,服务端接收并原样返回。
a. 服务端代码 (FastAPI)
首先,使用FastAPI的 WebSocket 模块创建一个简单的端点。

对应的代码如下:
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
print(data)
await websocket.send_text(f"Message text was: {data}")
except WebSocketDisconnect:
print("Client disconnected")
这段代码完成了三件事:
websocket.accept():接受客户端的连接请求。
websocket.receive_text():在一个无限循环中等待并接收客户端发来的文本消息。
websocket.send_text(...):将接收到的消息加上前缀后发回给客户端。
b. 客户端代码 (HTML/JavaScript)
为了快速测试,我们创建一个临时的FastAPI应用来提供测试网页。

此页面核心的JavaScript连接逻辑如下:
html = """
<!DOCTYPE html>
<html>
<head>
<title>WebSocket Test</title>
</head>
<body>
<h1>WebSocket Test</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:6789/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
c. 运行与测试
分别启动服务端(端口6789)和客户端页面服务(端口8082),在浏览器中访问客户端页面进行测试。

从上图可以看到完整的数据流:
- 服务终端显示:连接建立 (
connection open),收到消息“qhz”和“123”。
- 浏览器页面显示:发送消息后,收到了服务端返回的
Message text was: qhz 和 Message text was: 123。
至此,一个最简单的WebSocket双向通信通道已经搭建成功。
2. 进阶:处理结构化JSON数据
在实际业务中,客户端发送的往往是结构化的JSON数据,而非纯文本。我们的目标是让客户端发送一个包含对话历史和消息的JSON对象,服务端能够正确解析。
a. 客户端发送JSON
修改客户端的JavaScript代码,构建一个符合业务要求的消息对象并发送。

关键修改在于 sendMessage 函数:
var message = {
"conversation_id": "39a7672db76711ef8bdbf020ff63f4c4",
"messages": [{
"content": "你好!我是专业宠物医生,有什么可以帮到你的吗?",
"role": "assistant"
}, {
"id": "6b616117-c3cc-4788-a451-64c24d161418",
"content": "我家狗感冒了",
"role": "user",
"doc_ids": []
}]
}
ws.send(JSON.stringify(message))
b. 服务端解析JSON
服务端也需要相应调整,将接收到的文本解析为JSON对象。

服务端代码增加JSON解析逻辑:
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
print(data)
message = json.loads(data) # 关键步骤:解析JSON字符串
print(message)
# response = {"echo": message["text"]}
response = message
print(response)
await websocket.send_json(response) # 使用send_json发送对象
except WebSocketDisconnect:
print("Client disconnect")
现在,服务端可以正确接收、解析并处理客户端发来的复杂JSON消息了。
3. 对接外部业务接口
在能够接收结构化数据后,下一步就是将这个数据发送给真正的业务处理接口(例如 ragflow 的对话补全接口),并将结果流式地返回给WebSocket客户端。
a. 接口分析与鉴权
目标业务接口是一个流式返回的HTTP接口:
http://localhost:9222/v1/conversation/completion
直接调用该接口可能会遇到鉴权问题。我们先尝试调用一个查询会话详情的相关接口,以排查问题。
GET http://localhost:9222/v1/api/conversation/39a7672db76711ef8bdbf020ff63f4c4

调用返回错误,提示 Token is not valid!。这表明需要在请求头中携带有效的认证信息。
b. 追踪鉴权逻辑
通过阅读目标服务的源码,我们可以找到鉴权的具体实现逻辑。

从代码中可以看到,接口从 Authorization 请求头中提取Token,并与数据库中进行校验。同时,它还校验该Token是否对请求的 conversation_id 所属的对话有访问权限。这意味着,在我们的WebSocket服务端代码中,需要先获取或生成一个有效的Token,并在调用外部接口时将其添加到请求头中。
c. 数据库查询验证
在解决鉴权问题前,有时也需要确认数据是否存在。通过追踪服务端的SQL查询,可以明确数据模型:
SELECT `t1`.`id`, `t1`.`create_time`, `t1`.`create_date`, `t1`.`update_time`, `t1`.`update_date`, `t1`.`dialog_id`, `t1`.`user_id`, `t1`.`message`, `t1`.`reference`, `t1`.`tokens`, `t1`.`source`, `t1`.`duration`, `t1`.`round`, `t1`.`thumb_up`
FROM `api_4_conversation` AS `t1`
WHERE (`t1`.`id` = '39a7672db76711ef8bdbf020ff63f4c4')
这个查询确认了会话数据存储在一张名为 api_4_conversation 的表中,为后续的集成提供了数据层面的参考。
总结与展望
本文通过两个实际开发场景,详细记录了如何在Python FastAPI框架下实现“删除会话”的REST接口和“收发消息”的WebSocket实时通信。从分析现有代码到实现、测试、对接,完整地展现了后端开发中常见的迭代过程。
对于WebSocket部分,我们完成了从建立连接、收发文本到处理JSON数据的进阶。下一步的关键是集成外部流式接口,这涉及到服务端如何管理认证Token、如何转发请求以及如何将流式响应分块推送给WebSocket客户端。这些实践对于构建聊天机器人、实时通知系统等应用具有直接的参考价值。希望这篇笔记能对在云栈社区学习和交流的开发者有所帮助。