学校建设网站报告书,做教育业网站,网络管理软件app,产品推广会议流程最近要实现一个在线聊天功能#xff0c;基于fastapi的websocket实现了这个功能。下面介绍一下遇到的技术问题
1.问题难点
在线上环境部署时#xff0c;一般是多进程的方式进行部署启动fastapi服务#xff0c;而每个启动的进程都有自己的独立存储空间。导致存储的连接对象分…最近要实现一个在线聊天功能基于fastapi的websocket实现了这个功能。下面介绍一下遇到的技术问题
1.问题难点
在线上环境部署时一般是多进程的方式进行部署启动fastapi服务而每个启动的进程都有自己的独立存储空间。导致存储的连接对象分布在不同的进程中当进行通信时可能无法找到已连接的连接对象。
2.解决方案
使用使用redis的订阅发布机制使所有的进程都能进行消息订阅。这样能保证每个进程收到消息后都会进行相关的信息处理了。
3.方案设计
每个进程启动的时候都进行一个消息的订阅。通过http请求进行消息发布。每个进程收到发布的消息后进行判断是否由自己进行处理。
4.代码实现
①在服务启动时进行消息订阅并一直监听消息通道。当有消息发布时进行消息处理。
# 初始化app
app FastAPI(titleWs Chat, description测试, version1.0.0)
app.openapi_version 3.0.0app.include_router(chat.app, prefix/api/chat, tags[Chat])app.on_event(startup)
async def on_startup():print(f订阅初始化:{os.getpid()})# 执行消息订阅机制https://aioredis.readthedocs.io/en/latest/examples/loop asyncio.get_event_loop()loop.create_task(register_pubsub())async def reader(channel):# 进行消息的消费async for msg in channel.listen(): # 监听通道# print(msg)msg_data msg.get(data)if msg_data and isinstance(msg_data, str):msg_data_dict json.loads(msg_data)print(fchat:{msg_data_dict})sender msg_data_dict.get(sender)# 进行消息处理await chat.cm.handle_websocket_message(msg_data_dict, sender)async def register_pubsub():pool aioredis.from_url(redis://{}.format(host), dbdb, passwordpassword, portport, encodingutf-8, decode_responsesTrue)psub pool.pubsub()async with psub as p:# 消息订阅await p.subscribe(chat)await reader(p)await p.unsubscribe(chat)②websocket处理类
from fastapi import WebSocket, WebSocketDisconnectclass ConnectionManager:def __init__(self):# 保存当前所有的链接的websocket对象self.websocket_connections {}async def connect(self, websocket: WebSocket, client_id):# 添加连接并发送欢迎消息await websocket.accept()self.websocket_connections[client_id] websocketawait websocket.send_json({type: system,msg: Welcome to the chat app!,sender: system,recipient: client_id})try:# 处理消息while True:# 获取信息message await websocket.receive_json()# 处理发送信息await self.handle_websocket_message(message, client_id)except WebSocketDisconnect:# 连接断开时移除连接del self.websocket_connections[client_id]async def handle_websocket_message(self, message: dict, client_id):# 处理私聊消息if message.get(type) private_message:recipient message.get(recipient)msg message.get(msg)recipient_conn self.websocket_connections.get(recipient)if recipient_conn:# 在线await recipient_conn.send_json({type: private_message,sender: client_id,msg: msg,recipient: recipient})async def broadcast(self, message: dict):# 循环变量给所有在线激活的链接发送消息-全局广播for connection in self.websocket_connections:await connection.send_text(message)async def close(self, websocket: WebSocket, client_id):# 断开客户端的链接await websocket.close()del self.websocket_connections[client_id]async def disconnect(self, user_id):websocket: WebSocket self.websocket_connections[user_id]await websocket.close()del self.websocket_connections[user_id]
③websocket连接
from app.chat_manager.server import ConnectionManagercm ConnectionManager()app.websocket(/connect_chat)
async def connect_chat(websocket: WebSocket, user_code: str):try:await cm.connect(websocket, user_code)except WebSocketDisconnect:# 连接断开时移除连接del cm.websocket_connections[user_code]
④http请求进行消息发布
app.post(/create_chat, summary发起聊天)
async def create_chat(param: DiagnosisChatSch, rDepends(get_redis)):ws_param {type: private_message,msg: param.msg,sender: param.sender,recipient: param.recipient}# 进行消息发布await r.publish(diagnosis_chat, json.dumps(ws_param))return {code: 200, msg: 成功, data: }5.源码
github源码地址https://github.com/zhangyukuo/fastapi_ws_chat
6.参考文章
https://www.cnblogs.com/a00ium/p/16931133.html