功能说明
本教程将引导你使用 FastAPI 框架与 WebSocket 协议,实现一个简易的一对一在线客服系统。该系统具备以下核心功能:
- 模拟一对一客服聊天:每个客户与客服建立独立的会话通道。
- 聊天记录缓存与重现:为每个客户ID缓存聊天记录。即使客户关闭窗口后再次使用相同ID连接,之前的聊天记录也会完整呈现。
核心代码实现与解析
下面我们将通过完整的代码来展示如何实现上述功能。代码包含了数据模型、连接管理、前端页面以及主要的WebSocket路由处理逻辑。
# -*- coding:utf-8 -*-
# @FileName :demo.py
# @Time :2026/01/25 13:47
# @Author :Rebort
# @Info :
import uvicorn
from fastapi import FastAPI
from starlette.websockets import WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from typing import List
from datetime import datetime
websocket_ip = 'localhost'
app = FastAPI()
class ChatText:
def __init__(self,
chat_src,
chat_text):
"""
:param chat_src: 0-客户端 1-服务端
:param chat_text:
"""
self.chat_src = chat_src
self.chat_text = chat_text
self.timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
class CustomerChatUtils:
def __init__(self,
client_id):
self.client_id = client_id
self.server_websocket = None
self.customer_websocket = None
self.active = False
self.timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.chat_message_list: List[ChatText] = []
class CustomerServiceUtils:
def __init__(self):
self.client_ids = []
self.customer_active_connections: List[WebSocket] = [] # 客户端活跃链接
self.server_active_connections: List[WebSocket] = [] # 服务端活跃链接
self.customer_chat_obj: List[CustomerChatUtils] = []
async def customer_connect(self, websocket: WebSocket):
await websocket.accept()
self.customer_active_connections.append(websocket)
async def server_connect(self, websocket: WebSocket):
await websocket.accept()
self.server_active_connections.append(websocket)
def customer_disconnect(self, websocket: WebSocket):
self.customer_active_connections.remove(websocket)
for item in self.customer_chat_obj:
if item.customer_websocket == websocket:
item.customer_websocket = None
item.active = False
def server_disconnect(self, websocket: WebSocket):
self.server_active_connections.remove(websocket)
for item in self.customer_chat_obj:
if item.server_websocket == websocket:
item.server_websocket = None
def get_customer_chat_obj(self, client_id) -> CustomerChatUtils:
"""获取历史聊天记录
:param client_id:
:return:
"""
current_chat_message = None
for chat_message in self.customer_chat_obj:
if chat_message.client_id == client_id:
current_chat_message = chat_message
break
if current_chat_message is None:
current_chat_message = CustomerChatUtils(client_id)
self.customer_chat_obj.append(current_chat_message)
return current_chat_message
customer_service_utils = CustomerServiceUtils()
@app.get("/chat/client/{client_id}")
async def chat_client(client_id: int):
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>客户端</title>
</head>
<body>
<h1>客户端 Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://{websocket_ip}:8000/ws/customer/{client_id}");
""" + """
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
return HTMLResponse(html_content)
@app.get("/chat/server/{client_id}")
async def chat_server(client_id: int):
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>服务端</title>
</head>
<body>
<h1>服务端 Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://{websocket_ip}:8000/ws/server/{client_id}");
""" + """
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
return HTMLResponse(html_content)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
@app.websocket("/ws/customer/{client_id}")
async def websocket_endpoint_customer(websocket: WebSocket, client_id: int):
await customer_service_utils.customer_connect(websocket)
try:
customer_chat_obj = customer_service_utils.get_customer_chat_obj(client_id)
customer_chat_obj.active = True
customer_chat_obj.customer_websocket = websocket
for chat_message in customer_chat_obj.chat_message_list:
if chat_message.chat_src == 0:
await websocket.send_text(f'我 {chat_message.timestamp}: {chat_message.chat_text}')
else:
await websocket.send_text(f'客服 {chat_message.timestamp}: {chat_message.chat_text}')
while True:
data = await websocket.receive_text()
print(f'客户端发送消息:{data}')
_chat_text = ChatText(0, data)
customer_chat_obj.chat_message_list.append(_chat_text)
await websocket.send_text(f"我 {_chat_text.timestamp}: {data}")
# 发送给服务端
if customer_chat_obj.server_websocket is not None:
await customer_chat_obj.server_websocket.send_text(f"客户{client_id}-{_chat_text.timestamp}: {data}")
except WebSocketDisconnect:
customer_service_utils.customer_disconnect(websocket)
@app.websocket("/ws/server/{client_id}")
async def websocket_endpoint_server(websocket: WebSocket, client_id: int):
await customer_service_utils.server_connect(websocket)
try:
customer_chat_obj = customer_service_utils.get_customer_chat_obj(client_id)
if customer_chat_obj.active:
customer_chat_obj.server_websocket = websocket
for chat_message in customer_chat_obj.chat_message_list:
if chat_message.chat_src == 0:
await websocket.send_text(f'客户{client_id}-{chat_message.timestamp}: {chat_message.chat_text}')
else:
await websocket.send_text(f'我 {chat_message.timestamp}: {chat_message.chat_text}')
while True:
data = await websocket.receive_text()
print(f'服务端发送消息:{data}')
_chat_text = ChatText(1, data)
customer_chat_obj.chat_message_list.append(_chat_text)
await websocket.send_text(f"我 {_chat_text.timestamp}: {data}")
if customer_chat_obj.customer_websocket is not None:
await customer_chat_obj.customer_websocket.send_text(
f"客服:{_chat_text.timestamp}: {data}")
else:
await websocket.send_text("非法的client_id")
except WebSocketDisconnect:
customer_service_utils.server_disconnect(websocket)
uvicorn.run(app, host="0.0.0.0", port=8000)
如何使用
- 将上述代码保存为
demo.py 并运行。
- 使用不同的浏览器窗口或标签页分别访问客户端和服务端页面。
- 客户端链接格式:
http://localhost:8000/chat/client/{client_id}
- 例如,访问客户ID为1的客户端页面:
http://localhost:8000/chat/client/1
- 服务端链接格式:
http://localhost:8000/chat/server/{client_id}
- 例如,接待客户ID为1的客服页面:
http://localhost:8000/chat/server/1
系统启动后,在客户端页面发送的消息会被缓存,并且会实时推送到对应ID的客服页面。即使关闭页面后重新连接,之前的对话记录也会被加载并显示出来,这得益于我们基于客户ID的会话状态管理机制。这种基于 WebSocket 的实时双向通信模型,是构建现代在线交互应用的典型模式。
希望这个基于 FastAPI 的实例能帮助你理解如何构建实时、有状态的网络应用。如果你想了解更多关于后端架构或分布式系统设计的内容,欢迎到 云栈社区 与更多开发者一起交流探讨。
|