网站开发 东莞,网页设计教程案例 杨松,域名服务器上存放着,河北网站制作公司哪家好首先介绍一下具体的功能设计。整个系统由服务端、客户端组成。客户端包含前端(浏览器端、pyqt桌面端)和后台(用于添加要推送的消息)。因为浏览器端不容易注意到实时提醒#xff0c;所以我将实时提醒功能放到pyqt桌面端中实现(如托盘区闪烁等醒目的方式)。浏览器端中只对消息进…首先介绍一下具体的功能设计。整个系统由服务端、客户端组成。客户端包含前端(浏览器端、pyqt桌面端)和后台(用于添加要推送的消息)。因为浏览器端不容易注意到实时提醒所以我将实时提醒功能放到pyqt桌面端中实现(如托盘区闪烁等醒目的方式)。浏览器端中只对消息进行拉取操作(刷新网页时)而不采取实时推的方式。pyqt桌面端主要接收新消息提示即可通知用户到网页端查看最新消息。(登录时主动拉取一次消息判断有无新消息或者服务器主动推送所有新消息)实时推送消息和主动拉取消息两个功能实际上是完全分离的可以独立行使各自的职能。不过下面我就不写浏览器端的代码了 直接把相关的逻辑也放到pyqt里实现。然后再来讲一下拉取服务端消息的逻辑。每个用户拥有自己的消息库。当消息库为空时一次性拉取服务端所有消息入用户库当消息库内有消息时每次拉取只拉取用户库内最新的消息发布时间之后的消息(将未入用户库的消息入库后再从用户库拉取)。代码共5部分1.鉴权服务器用于服务和服务之间对用户提供的token做权限检查2.消息数据服务器用于读取和处理用户消息在数据库中的状态。3.消息在线推送服务器将管理员发布的消息在线实时推送给所有在线用户(若使用发布器的在线推送功能)4.管理员消息发布器分为在线推送和离线入库两个功能可独立或组合使用。在线推送消息不会入消息库。5.用户消息监听器登陆后可接收管理员推送的在线消息也可主动拉取用户对应消息库中的所有已读、未读消息并标记消息的已读、未读状态。下面是代码部分(为了简化代码逻辑后面代码中的token即用户名。)1.鉴权服务器from flask import Flask, request, jsonifyapp Flask(__name__)class Lib:staticmethoddef auth_admin_token(token):return Truestaticmethoddef auth_user_token(token):return Trueapp.route(/admin, methods [POST])def admin_auth_token():data request.jsonif Lib.auth_admin_token(data[token]):return jsonify({code: 200, msg: 鉴权成功})return jsonify({code: 400, msg: 鉴权失败})app.route(/user, methods [POST])def user_auth_token():data request.jsonif Lib.auth_user_token(data[token]):return jsonify({code: 200, msg: 鉴权成功})return jsonify({code:400, msg: 鉴权失败})if __name__ __main__:app.run(host 0.0.0.0, port 5009)2.消息数据服务器# coding : utf-8# author : [Wang Suyin, ]# data : 2020/2/21 14:14# software : PyCharm# python_version : 3.5.3 64bit# file : 2.消息数据服务器.py说明文档import datetimeimport requestsimport jsonfrom flask import Flask,request,jsonifyappFlask(__name__)# 虚拟数据db {message_table: [{id: 1, message: 消息1, publish_time: (2020, 1, 20, 3, 4, 5)},{id: 2, message: 消息2, publish_time: (2020, 1, 20, 3, 4, 6)},{id: 3, message: 消息3, publish_time: (2020, 1, 20, 3, 4, 7)},],user_to_messages_table: [{username: tester, message_id: 1, read: True},{username: tester, message_id: 2, read: False},],}class Utils:staticmethoddef auth_admin_token(token):data {token: token}r requests.post(url http://127.0.0.1:5009/admin, json data)r json.loads(r.text)if r[code] 200:return Truereturn Falsestaticmethoddef add_message(message):t datetime.datetime.now()publish_time (t.year, t.month, t.day, t.hour, t.minute, t.second)db[message_table].append({id:len(db[message_table])1,message: message, publish_time: publish_time})staticmethoddef get_user_messages(username):print(db[message_table])# 拉取新消息入用户库# 原先考虑的是拉取最新时间后的消息目前看来求消息ID表的差集更简单些,也更保险些(防止遗漏)。当然要根据具体场景优化代码也要更换为数据库代码这里仅为示意。for message_id in list(set([e[id] for e in db[message_table]])- set([e[message_id] for e in [e_ for e_ in db[user_to_messages_table] if e_[username]username]])):db[user_to_messages_table].append({username: username, message_id: message_id, read: False})# 在通知量少的情况下可一次性返回所有内容。# 在通知量多的情况下可分页返回也可只返回未读消息已读消息由客户端本地保存。# 简单起见以下一次性返回所有内容res []for e in db[user_to_messages_table]:if e[username] username:d [e_ for e_ in db[message_table] if e_[id] e[message_id]][0]d.update({read: e[read]})res.append(d)return resstaticmethoddef mark_as_read(username, message_id,read_status):#将消息标记为已读、未读状态比较简单就不具体谢写了passapp.route(/add_message, methods[POST])def add_message():data request.jsontoken data[token]r requests.post(urlhttp://127.0.0.1:5009/admin,json{ token:token } )r json.loads(r.text)if not r[code] 200:return jsonify({code:400,msg:鉴权失败})Utils.add_message(data[message])print(新的消息入库成功)return jsonify({code: 200, msg: 消息发布成功})app.route(/get_user_messages, methods[POST])def get_user_messages():data request.jsontoken data[token]if not Utils.auth_admin_token(token):return jsonify({code:400,msg:鉴权失败})username tokendata Utils.get_user_messages(username)return jsonify({code: 200, msg: 拉取用户消息成功,data:data})app.route(/mark_as_read, methods[POST])def mark_as_read():data request.jsontoken data[token]if not Utils.auth_admin_token(token):return jsonify({code:400,msg:鉴权失败})username tokenmessage_id data[message_id]read_status data[read_status]data Utils.mark_as_read(username, message_id,read_status)return jsonify({code: 200, msg: 拉取用户消息成功,data:data})if __name__ __main__:app.run(host 0.0.0.0, port 5008)3.消息在线推送服务器# coding : utf-8# author : [Wang Suyin, ]# data : 2020/2/20 16:48# software : PyCharm# python_version : 3.5.3 64bit# file : 3.消息在线推送服务器.py说明文档import requestsimport jsonfrom flask import Flaskfrom flask_sockets import Socketsapp Flask(__name__)sockets Sockets(app)ws_pool [] # 推送目标池def auth_admin_token(token):data { token:token }r requests.post(urlhttp://127.0.0.1:5009/admin,jsondata )r json.loads(r.text)if r[code] 200:return Truereturn Falsedef auth_user_token(token):data { token:token }r requests.post(urlhttp://127.0.0.1:5009/user,jsondata )r json.loads(r.text)if r[code] 200:return Truereturn False#新消息的插入可以通过ws也可以通过httpsockets.route(/admin)def admin_socket(ws):print(admin接入)r_data ws.receive()r_data json.loads(r_data)token r_data[data][token]if not r_data[type]init or not auth_admin_token(token):ws.send(json.dumps({type:init_r,code: 400, msg: 鉴权失败}))ws.close()returnws.send(json.dumps({type:init_r,code: 200, msg: 鉴权成功}))while not ws.closed:r_data ws.receive()if not r_data:breakws.send(json.dumps({type:message_r,code:200, msg:发布成功}))data json.loads(r_data)if data[type] message:print(将消息推送给{}.format(ws_pool))print(ws_pool)#推送给在线用户for e in ws_pool:try:e.send(json.dumps({type:message,data:{message:data[data][message]}}))except:ws_pool.remove(e)try:ws.close()except:passsockets.route(/listener)def listener_socket(ws):print(listener接入)r_data ws.receive()r_data json.loads(r_data)token r_data[data][token]if not r_data[type]init or not auth_user_token(token):ws.send(json.dumps({type:init_r,code: 400, msg: 鉴权失败}))ws.close()returnws.send(json.dumps({type:init_r,code: 200, msg: 鉴权成功}))ws_pool.append(ws)while not ws.closed:r_data ws.receive()if not r_data:break#这里阻塞住就可以了因为消息监听器只接收消息try:ws.close()except:passfinally:ws_pool.remove(ws)if __name__ __main__:from gevent import pywsgifrom geventwebsocket.handler import WebSocketHandlerfrom gevent import monkeymonkey.patch_all()server pywsgi.WSGIServer((0.0.0.0, 5003), app, handler_class WebSocketHandler)print(web server start ... )server.serve_forever()4.管理员消息发布器import jsonimport requestsimport websocketwebsocket.enableTrace(True)# 这里就不写界面了要推送的消息一并写在on_message里class OnlinePublisher:_message def __init__(self, url ws://127.0.0.1:5003/admin):self.__ws ws websocket.WebSocketApp(url)ws.on_open self.on_openws.on_message self.on_messagews.on_error self.on_errorws.on_close self.on_closedef publish(self, message):OnlinePublisher._message messageself.__ws.run_forever()staticmethoddef on_open(ws):token admindata json.dumps({type: init, data: {token: token}})ws.send(data)staticmethoddef on_message(ws, r_data):print(接收到消息{}.format(r_data))data json.loads(r_data)if data[type] init_r:if data[code] ! 200:ws.close()raise Exception(鉴权失败)ws.send(json.dumps({type: message, data: {message: OnlinePublisher._message}}))elif data[type] message_r:if data[code] 200:print(发布成功)else:raise Exception(发布失败)ws.close()staticmethoddef on_error(ws):print(连接异常)staticmethoddef on_close(ws):print(连接关闭)def online_publish(message):OnlinePublisher().publish(message)def offline_publish(message):data {token: admin, message: message}r requests.post(url http://127.0.0.1:5008/add_message, json data)print(r.text)r json.loads(r.text)if r[code] 400:raise Exception(鉴权失败)print(消息入库成功)if __name__ __main__:message 测试消息这是一条公告online_publish(message)offline_publish(message)5.用户消息监听器import jsonimport sysimport datetimeimport threadingimport requestsfrom PyQt5.QtWidgets import *from PyQt5.QtGui import *from PyQt5.QtCore import *import websocketwebsocket.enableTrace(True)class ActionSet:staticmethoddef on_open(ws):token userdata json.dumps({type: init, data: {token: token}})ws.send(data)staticmethoddef on_message(ws, r_data):print(接收到消息{}.format(r_data))data json.loads(r_data)if data[type] init_r:if data[code] ! 200:ws.close()print(鉴权失败)returnprint(鉴权成功)if data[type] message:print(data[data][message])MainWindow._instance.new_message_got.emit(data[data][message])staticmethoddef on_error(ws):print(连接异常)staticmethoddef on_close(ws):print(连接关闭)class MainWindow(QWidget):_instance Nonenew_message_got pyqtSignal(str)def __init__(self, parent None):super().__init__(parent)MainWindow._instance selfself.setWindowTitle(消息监听器)self.resize(800, 600)self.__btn_online_connect QPushButton(连接)self.__lw_online_message QListWidget()self.__le_username QLineEdit()self.__le_username.setPlaceholderText(要拉取消息的用户名)self.__btn_pull_all_messages QPushButton(拉取所有消息/刷新)self.__lw_all_messages QListWidget()main_layout QVBoxLayout()main_layout.setSpacing(0)main_layout.setContentsMargins(0, 0, 0, 0)self.setLayout(main_layout)main_layout.addWidget(self.__btn_online_connect)main_layout.addWidget(self.__lw_online_message)main_layout.addWidget(self.__le_username)main_layout.addWidget(self.__btn_pull_all_messages)main_layout.addWidget(self.__lw_all_messages)self.__btn_online_connect.clicked.connect(self.__on_btn_online_connect_clicked)self.new_message_got.connect(self.__show_new_message)self.__btn_pull_all_messages.clicked.connect(self.__on_btn_pull_all_messages_clicked)def __on_btn_online_connect_clicked(self):url ws://127.0.0.1:5003/listenerws websocket.WebSocketApp(url)ws.on_open ActionSet.on_openws.on_message ActionSet.on_messagews.on_error ActionSet.on_errorws.on_close ActionSet.on_closewst threading.Thread(target ws.run_forever)wst.setDaemon(True)wst.start()def __on_btn_pull_all_messages_clicked(self):data {token: self.__le_username.text()}r requests.post(url http://127.0.0.1:5008/get_user_messages, json data)r json.loads(r.text)if r[code] 400:raise Exception(鉴权失败)print(消息拉取成功)message_datas sorted(r[data][:],key lambda x:-x[id])self.__lw_all_messages.clear()for d in message_datas:self.__lw_all_messages.addItem(【{}】{}.format({True:已读,False:未读}[d[read]],d[message] ) )def __show_new_message(self, text):time_ datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S)text {} {}.format(time_, text)self.__lw_online_message.insertItem(0, text)if __name__ __main__:app QApplication(sys.argv)w MainWindow()w.show()app.exec_()