找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

2741

积分

0

好友

348

主题
发表于 昨天 06:28 | 查看: 0| 回复: 0

一、删除会话接口开发实战

在开发对话型应用的后台时,会话管理是核心功能之一。用户需要能够清理或删除不再需要的对话记录。本文将以一个实际的开发任务为例,记录如何基于现有代码结构,快速实现一个删除会话的接口。

1. 分析与定位原始接口

接到需求后,首先要做的是找到系统内已存在的类似接口,理解其设计逻辑,以便快速复用或参考。

a. 接口地址定位
通过检查代码或网络请求,我们定位到原始的删除会话接口地址为:

http://localhost:9222/v1/conversation/rm

该接口是一个POST请求,需要传递特定格式的参数。

b. 源码逻辑分析
查看ragflow项目中的相关源码,我们发现接口的核心处理逻辑如下:

@manager.route('/rm', methods=['POST'])
@login_required
def rm():
    conv_ids = request.json["conversation_ids"]
    try:
        for cid in conv_ids:
            exist, conv = ConversationService.get_by_id(cid)
            if not exist:
                return get_data_error_result(retmsg="Conversation not found!")
            tenants = UserTenantService.query(user_id=current_user.id)
            for tenant in tenants:
                if DialogService.query(tenant_id=tenant.tenant_id, id=conv.dialog_id):
                    break
            else:
                return get_json_result(
                    data=False, retmsg=f'Only owner of conversation authorized for this operation.',
                    retcode=RetCode.OPERATING_ERROR)
            ConversationService.delete_by_id(cid)
        return get_json_result(data=True)
    except Exception as e:
        return server_error_response(e)

这段代码的核心逻辑是:接收一个会话ID列表,依次检查每个会话是否存在,并验证当前用户是否有权限操作该会话所属的对话(Dialog),验证通过后执行删除操作。

2. 在新服务中实现接口

理解了原始逻辑后,我们需要在自己的chat_serve服务中实现一个功能相似的接口。由于新服务的认证和上下文可能不同,我们需要进行一些适配。

a. 服务端代码实现
在自己的 chat_api.py 中,我们实现新的路由 /rm_conversation
FastAPI路由实现与API调试界面

上图左侧展示了使用API调试工具(如Postman或Apifox)调用该接口的过程。我们向 {host}:8789/api/rm_conversation 发起POST请求,Body中传入JSON格式的 conversation_idsdialog_id

右侧对应的Python核心实现代码如下:

@router.post('/rm_conversation')
async def set_conversation(request:Request):
    req = await request.body()
    req = json.loads(req.decode("utf-8"))
    conv_ids = []
    if 'conversation_ids' in req and req['conversation_ids']:
        conv_ids = req['conversation_ids']
    try:
        for cid in conv_ids:
            exist, conv = ConversationService.get_by_id(cid)
            if not exist:
                return error(retmsg="Conversation not found!")
            # 权限校验逻辑(此处根据业务需要调整或注释)
            # tenants = UserTenantService.query(user_id=current_user.id)
            # if DialogService.query(tenant_idtenant.tenant_id, id=conv.dialog_id):
            #     break
            # else:
            #     return get_json_result(
            #         data=False,
            #         retmsg="Only owner of conversation authorized for this operation.",
            #         retcode=RetCode.OPERATING_ERROR)
            ConversationService.delete_by_id(cid)
        return ok(data=True)
    except Exception as e:
        return error(e)

关键点说明:

  1. 请求解析:通过 request.body() 获取原始字节数据,然后解码并解析为JSON。
  2. 参数处理:安全地获取 conversation_ids 参数。
  3. 循环删除:遍历ID列表,逐一检查并删除。
  4. 权限校验:原始代码中的多租户权限校验逻辑,在新服务中可根据实际情况保留、修改或暂时注释掉。
  5. 异常处理:用try-except包裹核心逻辑,确保服务稳定性。

b. 接口测试
调用接口后,服务端返回了预期的成功响应,表明删除操作执行成功。

{
  "code": 0,
  "data": true,
  "msg": "success"
}

二、WebSocket客户端消息收发实战

除了常规的HTTP REST API,实时通信是对话系统的另一个关键需求。接下来,我们实现一个WebSocket服务端,并创建一个网页客户端与之进行双向通信。

1. 建立基础的WebSocket通信

目标是先打通一个最简流程:客户端发送文本,服务端接收并原样返回。

a. 服务端代码 (FastAPI)
首先,使用FastAPI的 WebSocket 模块创建一个简单的端点。
WebSocket服务端基础实现代码

对应的代码如下:

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            print(data)
            await websocket.send_text(f"Message text was: {data}")
    except WebSocketDisconnect:
        print("Client disconnected")

这段代码完成了三件事:

  1. websocket.accept():接受客户端的连接请求。
  2. websocket.receive_text():在一个无限循环中等待并接收客户端发来的文本消息。
  3. websocket.send_text(...):将接收到的消息加上前缀后发回给客户端。

b. 客户端代码 (HTML/JavaScript)
为了快速测试,我们创建一个临时的FastAPI应用来提供测试网页。
WebSocket测试页面HTML与JS实现

此页面核心的JavaScript连接逻辑如下:

html = """
<!DOCTYPE html>
<html>
    <head>
        <title>WebSocket Test</title>
    </head>
    <body>
        <h1>WebSocket Test</h1>
        <form action="" onsubmit="sendMessage(event)">
            <input type="text" id="messageText" autocomplete="off"/>
            <button>Send</button>
        </form>
        <ul id='messages'>
        </ul>
        <script>
            var ws = new WebSocket("ws://localhost:6789/ws");
            ws.onmessage = function(event) {
                var messages = document.getElementById('messages')
                var message = document.createElement('li')
                var content = document.createTextNode(event.data)
                message.appendChild(content)
                messages.appendChild(message)
            };
            function sendMessage(event) {
                var input = document.getElementById("messageText")
                ws.send(input.value)
                input.value = ''
                event.preventDefault()
            }
        </script>
    </body>
</html>
"""

c. 运行与测试
分别启动服务端(端口6789)和客户端页面服务(端口8082),在浏览器中访问客户端页面进行测试。
WebSocket通信测试全流程截图

从上图可以看到完整的数据流:

  1. 服务终端显示:连接建立 (connection open),收到消息“qhz”和“123”。
  2. 浏览器页面显示:发送消息后,收到了服务端返回的 Message text was: qhzMessage text was: 123

至此,一个最简单的WebSocket双向通信通道已经搭建成功。

2. 进阶:处理结构化JSON数据

在实际业务中,客户端发送的往往是结构化的JSON数据,而非纯文本。我们的目标是让客户端发送一个包含对话历史和消息的JSON对象,服务端能够正确解析。

a. 客户端发送JSON
修改客户端的JavaScript代码,构建一个符合业务要求的消息对象并发送。
构建并发送JSON格式消息的客户端代码

关键修改在于 sendMessage 函数:

var message = {
  "conversation_id": "39a7672db76711ef8bdbf020ff63f4c4",
  "messages": [{
    "content": "你好!我是专业宠物医生,有什么可以帮到你的吗?",
    "role": "assistant"
  }, {
    "id": "6b616117-c3cc-4788-a451-64c24d161418",
    "content": "我家狗感冒了",
    "role": "user",
    "doc_ids": []
  }]
}
ws.send(JSON.stringify(message))

b. 服务端解析JSON
服务端也需要相应调整,将接收到的文本解析为JSON对象。
服务端解析JSON数据的代码

服务端代码增加JSON解析逻辑:

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            print(data)
            message = json.loads(data)  # 关键步骤:解析JSON字符串
            print(message)
            # response = {"echo": message["text"]}
            response = message
            print(response)
            await websocket.send_json(response)  # 使用send_json发送对象
    except WebSocketDisconnect:
        print("Client disconnect")

现在,服务端可以正确接收、解析并处理客户端发来的复杂JSON消息了。

3. 对接外部业务接口

在能够接收结构化数据后,下一步就是将这个数据发送给真正的业务处理接口(例如 ragflow 的对话补全接口),并将结果流式地返回给WebSocket客户端。

a. 接口分析与鉴权
目标业务接口是一个流式返回的HTTP接口:

http://localhost:9222/v1/conversation/completion

直接调用该接口可能会遇到鉴权问题。我们先尝试调用一个查询会话详情的相关接口,以排查问题。

GET http://localhost:9222/v1/api/conversation/39a7672db76711ef8bdbf020ff63f4c4

调用外部API返回Token无效错误

调用返回错误,提示 Token is not valid!。这表明需要在请求头中携带有效的认证信息。

b. 追踪鉴权逻辑
通过阅读目标服务的源码,我们可以找到鉴权的具体实现逻辑。
会话详情接口的鉴权逻辑代码

从代码中可以看到,接口从 Authorization 请求头中提取Token,并与数据库中进行校验。同时,它还校验该Token是否对请求的 conversation_id 所属的对话有访问权限。这意味着,在我们的WebSocket服务端代码中,需要先获取或生成一个有效的Token,并在调用外部接口时将其添加到请求头中。

c. 数据库查询验证
在解决鉴权问题前,有时也需要确认数据是否存在。通过追踪服务端的SQL查询,可以明确数据模型:

SELECT `t1`.`id`, `t1`.`create_time`, `t1`.`create_date`, `t1`.`update_time`, `t1`.`update_date`, `t1`.`dialog_id`, `t1`.`user_id`, `t1`.`message`, `t1`.`reference`, `t1`.`tokens`, `t1`.`source`, `t1`.`duration`, `t1`.`round`, `t1`.`thumb_up`
FROM `api_4_conversation` AS `t1`
WHERE (`t1`.`id` = '39a7672db76711ef8bdbf020ff63f4c4')

这个查询确认了会话数据存储在一张名为 api_4_conversation 的表中,为后续的集成提供了数据层面的参考。

总结与展望

本文通过两个实际开发场景,详细记录了如何在Python FastAPI框架下实现“删除会话”的REST接口和“收发消息”的WebSocket实时通信。从分析现有代码到实现、测试、对接,完整地展现了后端开发中常见的迭代过程。

对于WebSocket部分,我们完成了从建立连接、收发文本到处理JSON数据的进阶。下一步的关键是集成外部流式接口,这涉及到服务端如何管理认证Token、如何转发请求以及如何将流式响应分块推送给WebSocket客户端。这些实践对于构建聊天机器人、实时通知系统等应用具有直接的参考价值。希望这篇笔记能对在云栈社区学习和交流的开发者有所帮助。




上一篇:Spring SpEL实战:结合AOP打造动态规则引擎,告别业务硬编码
下一篇:极简主义AI编码智能体Pi-mono解析:为何功能越少,GitHub越火?
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-2-9 00:32 , Processed in 1.306012 second(s), 47 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表