如何为旅游网站店铺做推广营销,韩国网页设计欣赏,网络营销案例视频,房地产网站建设报价在接触的一些项目中#xff0c;有时为了方便可视化一些服务状态#xff08;请求数很少#xff09;#xff0c;那么很容易想到使用http服务来实现。但开源的web后端框架#xff0c;例如flask#xff0c;fastapi#xff0c;django等略显沉重#xff0c;且使用这些框架会有…在接触的一些项目中有时为了方便可视化一些服务状态请求数很少那么很容易想到使用http服务来实现。但开源的web后端框架例如flaskfastapidjango等略显沉重且使用这些框架会有各种各样的限制为了更加灵活的使用可以自己通过Python自带的socket库来实现下面是我简单实现的代码贴在这里方便后续使用。
客户端代码
import os
import time
import json
import socket
import datetime
import traceback
from urllib import parseENCODING utf-8class CustomRouter:def __init__(self):self.router_dict {}def register(self, url: str, method: str):if method not in [GET, POST]:raise ValueError(fmethod only support GET or POST, got {method})key f{url}:{method}if key in self.router_dict:raise ValueError(furl:{url} method:{method} already registed)def _register(func):self.router_dict[key] funcreturn funcreturn _registerdef has_register(self, url: str, method: str):key f{url}:{method}return key in self.router_dictdef request(self, url: str, method: str):key f{url}:{method}func self.router_dict[key]return func()router CustomRouter()router.register(url/, methodGET)
def task():return Hello World!!!router.register(url/get_project_info, methodGET)
def get_project_info():info {pwd: os.path.abspath(os.curdir),encoding: ENCODING,time: datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S)}return json.dumps(info)class CustomServer:def __init__(self, port: int 5055, backlog: int 2):self.server socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.server.bind((, port))self.server.listen(backlog)self.buff_size 1024self.encoding ENCODINGself.router routerself.timeout 1.0def parse_message(self, message: str):method, url_params, message message.split( , maxsplit2)res parse.urlparse(url_params)url res.pathparams dict([i for i in parse.parse_qsl(res.query)])version, message message.split(\r\n, maxsplit1)return method, url, params, versiondef run(self):while True:conn, addr self.server.accept()# conn.settimeout(self.timeout)with conn:try:message conn.recv(self.buff_size).decode(self.encoding)t0 time.perf_counter()method, url, params, version self.parse_message(message)if not self.router.has_register(url, method):conn.send(HTTP/1.1 501 Not Implemented\r\n\r\n.encode(self.encoding))print(f[{method}][501]client addr: {addr} request: {url}, time: {time.perf_counter() - t0:.5f})continueres self.router.request(url, method)conn.send(HTTP/1.1 200 OK\r\n\r\n.encode(self.encoding))conn.send(res.encode(self.encoding))print(f[{method}][200]client addr: {addr} request: {url}, time: {time.perf_counter() - t0:.5f})except (BrokenPipeError, TimeoutError):print(traceback.format_exc())except:print(traceback.format_exc())conn.send(HTTP/1.1 500 Inner Error\r\n\r\n.encode(self.encoding))print(f[{method}][500]client addr: {addr} request: {url}, time: {time.perf_counter() - t0:.5f})if __name__ __main__:s CustomServer()s.run()
客户端代码
import requestsdef main():for _ in range(50):res requests.get(http://127.0.0.1:5055)print(res.status_code, res.text)res requests.get(http://127.0.0.1:5055/get_project_info)print(res.status_code, res.json())if __name__ __main__:main()
客户端请求时服务端终端输出
[GET][200]client addr: (127.0.0.1, 38054) request: /, time: 0.00002
[GET][200]client addr: (127.0.0.1, 38058) request: /get_project_info, time: 0.00004
[GET][200]client addr: (127.0.0.1, 38060) request: /, time: 0.00002
[GET][200]client addr: (127.0.0.1, 38070) request: /get_project_info, time: 0.00004
[GET][200]client addr: (127.0.0.1, 38080) request: /, time: 0.00002
[GET][200]client addr: (127.0.0.1, 38096) request: /get_project_info, time: 0.00004
[GET][200]client addr: (127.0.0.1, 38098) request: /, time: 0.00002
[GET][200]client addr: (127.0.0.1, 38114) request: /get_project_info, time: 0.00009
[GET][200]client addr: (127.0.0.1, 38120) request: /, time: 0.00003
[GET][200]client addr: (127.0.0.1, 38122) request: /get_project_info, time: 0.00006
[GET][200]client addr: (127.0.0.1, 38136) request: /, time: 0.00003
[GET][200]client addr: (127.0.0.1, 38146) request: /get_project_info, time: 0.00006
[GET][200]client addr: (127.0.0.1, 38160) request: /, time: 0.00003
[GET][200]client addr: (127.0.0.1, 38172) request: /get_project_info, time: 0.00005
[GET][200]client addr: (127.0.0.1, 38178) request: /, time: 0.00002
[GET][200]client addr: (127.0.0.1, 38182) request: /get_project_info, time: 0.00005客户端终端输出
200 Hello World!!!
200 {pwd: /home/wz/my_projects/others/socket_t, encoding: utf-8, time: 2025-01-14 00:20:01}
200 Hello World!!!
200 {pwd: /home/wz/my_projects/others/socket_t, encoding: utf-8, time: 2025-01-14 00:20:01}
200 Hello World!!!
200 {pwd: /home/wz/my_projects/others/socket_t, encoding: utf-8, time: 2025-01-14 00:20:01}
200 Hello World!!!
200 {pwd: /home/wz/my_projects/others/socket_t, encoding: utf-8, time: 2025-01-14 00:20:01}
200 Hello World!!!
200 {pwd: /home/wz/my_projects/others/socket_t, encoding: utf-8, time: 2025-01-14 00:20:01}
200 Hello World!!!
200 {pwd: /home/wz/my_projects/others/socket_t, encoding: utf-8, time: 2025-01-14 00:20:01}
200 Hello World!!!
200 {pwd: /home/wz/my_projects/others/socket_t, encoding: utf-8, time: 2025-01-14 00:20:01}
200 Hello World!!!
200 {pwd: /home/wz/my_projects/others/socket_t, encoding: utf-8, time: 2025-01-14 00:20:01}