马拉松网站建设方案,自己电脑做服务器网站,看济南新闻,中山建网站多少钱文章目录 一、Flask-session使用1.使用方式一2.使用方式二3.读RedisSessionInterface源码4.flask-session补充 二、数据库连接池1.flask中使用mysql2.上述问题解决 使用数据库连接池1.第三方数据库连接池2.操作数据库不带池版3.池版和非池版压测 三、wtforms四、Flask定制命令1… 文章目录 一、Flask-session使用1.使用方式一2.使用方式二3.读RedisSessionInterface源码4.flask-session补充 二、数据库连接池1.flask中使用mysql2.上述问题解决 使用数据库连接池1.第三方数据库连接池2.操作数据库不带池版3.池版和非池版压测 三、wtforms四、Flask定制命令1.使用 flask-script定制命令(老版本新版本不用了)2.新版本定制命令3.Django中自定制命令 五、Flask-Cache 一、Flask-session使用 Flask内置的Session会把数据加密后保存到浏览器 我们自己重写Session类保存到Reids中只需要重写open_session和save_session方法 而在这中间有一个模块就做了这件事那就是flask-session以把数据存放到文件、redis、mongodb、关系型数据库等中 安装flask-session pip install flask-session1.使用方式一 from flask import Flask,sessionapp Flask(__name__)app.debugTrueapp.secret_keyjlkdoasfiuz# 只要使用session就需要secret_key1.安装flask-session pip install flask-session使用方式一2.导入(这里我写入到redis缓存数据库中)from flask_session import RedisSessionInterface3.把app.session_interface替换成RedisSessionInterface的对象# 替换了就会走RedisSessionInterface的open_session和save_sessionfrom redis import RedisconnRedis(host127.0.0.1,port6379,db2)# 需要传入的参数 redis, key_prefix, use_signer, permanent, sid_length1.redis 是传入链接的redis库链接对象2.key_prefix 是保存在redis中名称的前缀3.use_signer 是如果是False就无需配置secret_key默认设置True4.permanent 是关闭浏览器cookie是否失效5.sid_length 是生成session_key的长度会以cookie形式写入到浏览器cookie中但是去掉redis中session这个前缀去掉前缀就是session_key的长度限制app.session_interfaceRedisSessionInterface(redisconn,key_prefixsession,use_signerFalse,permanentTrue,sid_length32)app.route(/set_session)def set_session():session[name] jackreturn set_sessionapp.route(/get_session)def get_session():print(session.get(name))return get_sessionif __name__ __main__:app.run()2.使用方式二 from flask import Flask,sessionapp Flask(__name__)app.debugTrueapp.secret_keyjlkdoasfiuz# 只要使用session就需要secret_key1.安装flask-session pip install flask-session使用方式二2.在flask配置文件中加入配置from redis import Redis # 导入redisapp.config[SESSION_TYPE] redis # 配置链接的类型app.config[SESSION_REDIS]Redis(host127.0.0.1,port6379,db2)# app.config[SESSION_KEY_PREFIX] session # 如果不写默认以SESSION_COOKIE_NAME作为key# app.config.from_pyfile(./settings) # 第二种导入配置文件方式3.导入Sessionfrom flask_session import SessionSession(app) # 核心和方式一一模一样具体看源码app.route(/set_session)def set_session():session[name] jackreturn set_sessionapp.route(/get_session)def get_session():print(session.get(name))return get_sessionif __name__ __main__:app.run()3.读RedisSessionInterface源码 1.RedisSessionInterface的open_session(在它的父类中)def open_session(self, app, request):# -取到前端传入在cookie中得随机字符串sid request.cookies.get(app.config[SESSION_COOKIE_NAME])if not sid:sid self._generate_sid(self.sid_length)# 当sid不为空把sid传入到session_class得到对象return self.session_class(sidsid, permanentself.permanent)if self.use_signer: # 用来加密所以第一种方式的ues_signer最好不要改为Falsetry:sid self._unsign(app, sid)except BadSignature:sid self._generate_sid(self.sid_length)return self.session_class(sidsid, permanentself.permanent)return self.fetch_session(sid)def fetch_session(self, sid):# 取到随机字符串prefixed_session_id self.key_prefix sid# 从redis中取出key为前缀随机字符串对应的value值value self.redis.get(prefixed_session_id)if value is not None:try:# 解密成字符串session_data self.serializer.loads(value)# 把解密后的数据组装到 session对象中return self.session_class(session_data, sidsid)except pickle.UnpicklingError:return self.session_class(sidsid, permanentself.permanent)return self.session_class(sidsid, permanentself.permanent)2.RedisSessionInterface的save_session(在它自己内)def save_session(self, app, session, response):if not self.should_set_cookie(app, session):returndomain self.get_cookie_domain(app)path self.get_cookie_path(app)if not session: # 如果session有值if session.modified: # 如果值被修改过就把cookie中的删除并且删除redis中的self.redis.delete(self.key_prefix session.sid)response.delete_cookie(app.config[SESSION_COOKIE_NAME], domaindomain, pathpath)return# expiration_datetime self.get_expiration_time(app, session)serialized_session_data self.serializer.dumps(dict(session))# 放到redis中self.redis.set(nameself.key_prefix session.sid,valueserialized_session_data,extotal_seconds(app.permanent_session_lifetime), # 过期时间)# 把session对应的随机字符串放到cookie中self.set_cookie_to_response(app, session, response, expiration_datetime)4.flask-session补充 - session的前缀如果不传默认config.setdefault(SESSION_KEY_PREFIX, session:)- session过期时间通过配置如果不写会有默认PERMANENT_SESSION_LIFETIME: timedelta(days31),#这个配置文件控制-设置cookie时如何设定关闭浏览器则cookie失效permanentFalseapp.config[SESSION_PERMANENT] False二、数据库连接池
1.flask中使用mysql settings.pySECRET_KEY fdsjakluizDEBUG TrueMYSQL_USER rootMYSQL_HOST 127.0.0.1MYSQL_PORT 3306MYSQL_PASSWORD 1234MYSQL_DATABASE cnblogsJSON_AS_ASCII Falseapp.pyimport pymysql.cursorsfrom flask import Flask, jsonifyapp Flask(__name__)app.config.from_pyfile(./settings.py)# pymysql操作mysqlfrom pymysql import Connectconn Connect(userapp.config.get(MYSQL_USER),passwordapp.config.get(MYSQL_PASSWORD),hostapp.config.get(MYSQL_HOST),databaseapp.config.get(MYSQL_DATABASE),portapp.config.get(MYSQL_PORT),)# pymysql.cursors.DictCursor查出来的是列表套字典的形式cursor conn.cursor(pymysql.cursors.DictCursor)# cursor conn.cursor()# app.config[JSON_AS_ASCII] False # 前端显示json格式中文app.route(/)def articles():cursor.execute(select id,title,author from article limit 10)article_list cursor.fetchall() # 拿出所有return jsonify(article_list)if __name__ __main__:app.run()这种方式conn和cursor如果是全局出现如下问题 上面的 conn和cursor 都是全局的假设极端情况同时并发两个用户-一个用户查询所有文章-一个用户查询所有用户在线程中全局变量是共享的就会出现第一个线程拿着cursor执行了cursor.execute(select id,title,author from article limit 10)然后第二个线程拿着 cursor 执行了cursor.execute(select id,name from user limit 10)第一个线程开始执行用的全是同一个cursorarticle_list cursor.fetchall()就会出现查询article的cursor取出来的数据是 用户相关数据---》出现数据错乱2.上述问题解决 每个人线程用自己的从conn和cursor在视图函数中拿到链接和cursor import pymysql.cursorsfrom flask import Flask, jsonifyapp Flask(__name__)app.config.from_pyfile(./settings.py)# pymysql操作mysqlfrom pymysql import Connectapp.route(/)def articles():conn Connect(userapp.config.get(MYSQL_USER),passwordapp.config.get(MYSQL_PASSWORD),hostapp.config.get(MYSQL_HOST),databaseapp.config.get(MYSQL_DATABASE),portapp.config.get(MYSQL_PORT),)cursor conn.cursor(pymysql.cursors.DictCursor)cursor.execute(select id,title,author from article limit 10)article_list cursor.fetchall() # 拿出所有return jsonify(article_list)app.route(/desc)def desc():conn Connect(userapp.config.get(MYSQL_USER),passwordapp.config.get(MYSQL_PASSWORD),hostapp.config.get(MYSQL_HOST),databaseapp.config.get(MYSQL_DATABASE),portapp.config.get(MYSQL_PORT),)cursor conn.cursor(pymysql.cursors.DictCursor)cursor.execute(select id,real_desc from article limit 10)article_list cursor.fetchall() # 拿出所有return jsonify(article_list)if __name__ __main__:app.run() 但是这种如果并发量过高就会出现连接数过多的问题mysql的性能就降低了 使用数据库连接池 上述操作存在的问题1.原生pymysql操作最好有一个rom----sqlalchemy2.并发问题conn和cursor要做成单例还是每个视图函数一个-如果使用单例数据会错乱-咱们需要每个视图函数哪一个链接如果并发数过多mysql链接数就会很多所以使用连接池解决# django orm操作一个请求就会拿到一个mysql的链接用完后就释放所以想要彻底解决得使用数据库连接池-限定 mysql链接最多无论多少线程操作都是从池中取链接使用解决上面的两个问题-数据库连接池-创建一个全局的池-每次进入视图函数从池中取一个连接使用使用完放回到池中只要控制池的大小就能控制mysql连接数1.第三方数据库连接池 pool.py 在这个文件中配置池也也可以在视图函数中配置# 1 安装 pip install dbutils# 2 使用实例化得到一个池对象---》池是单例from dbutils.pooled_db import PooledDBimport pymysqlPOOLPooledDB(creatorpymysql, # 使用链接数据库的模块maxconnections6, # 连接池允许的最大连接数0和None表示不限制连接数mincached2, # 初始化时链接池中至少创建的空闲的链接0表示不创建maxcached5, # 链接池中最多闲置的链接0和None不限制maxshared3,# 链接池中最多共享的链接数量0和None表示全部共享。PS: 无用因为pymysql和MySQLdb等模块的 threadsafety都为1所有值无论设置为多少_maxcached永远为0所以永远是所有链接都共享。blockingTrue, # 连接池中如果没有可用连接后是否阻塞等待。True等待False不等待然后报错maxusageNone, # 一个链接最多被重复使用的次数None表示无限制setsession[], # 开始会话前执行的命令列表。如[set datestyle to ..., set time zone ...]ping0,# ping MySQL服务端检查是否服务可用。# 如0 None never, 1 default whenever it is requested, 2 when a cursor is created, 4 when a query is executed, 7 alwayshost127.0.0.1,port3306,userroot,password1234,databasecnblogs,charsetutf8)app.py 视图函数from flask import Flask,jsonifyapp Flask(__name__)app.config.from_pyfile(./settings.py)import timefrom pool import POOLfrom pymysql.cursors import DictCursor## 3 在视图函数中导入使用app.route(/article)def article():conn POOL.connection()cursor conn.cursor(DictCursor)# 获取10条文章cursor.execute(select id,title,author from article limit 10)time.sleep(1)# 切换res cursor.fetchall()print(res)return jsonify({code: 100, msg: 成功, result: res})if __name__ __main__:app.run()2.操作数据库不带池版 from flask import Flask,jsonifyapp Flask(__name__)app.config.from_pyfile(./settings.py)import time## 3 在视图函数中导入使用app.route(/article)def article():import pymysqlfrom pymysql.cursors import DictCursorconn pymysql.connect(userroot,password1234,host127.0.0.1,databasecnblogs,port3306)cursor conn.cursor(DictCursor)# 获取10条文章cursor.execute(select id,title,author from article limit 10)time.sleep(1)# 切换res cursor.fetchall()print(res)return jsonify({code: 100, msg: 成功, result: res})if __name__ __main__:app.run(port5001)3.池版和非池版压测 压测代码 jmeter工具---》javaimport requestsfrom threading import Thread# 没有连接池def task():# res requests.get(http://127.0.0.1:5000/article) # 带连接池版res requests.get(http://127.0.0.1:5001/article) # 不带连接池版print(res.json())if __name__ __main__:l []for i in range(100):t Thread(targettask)t.start()l.append(t)for i in l:i.join()效果是使用池的连接数明显小不使用池连接数明显很大查看数据库连接数show status like %Threads%; 三、wtforms wtforms是一个支持多个web框架的form组件主要用于对用户请求数据进行验证、渲染错误信息、渲染页面 app.py
from flask import Flask,render_template,request,redirect
from wtforms import Form
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgetsappFlask(__name__)
app.debugTrueclass LoginForm(Form):# 字段内部包含正则表达式name simple.StringField(label用户名,validators[validators.DataRequired(message用户名不能为空.),validators.Length(min6, max18, message用户名长度必须大于%(min)d且小于%(max)d)],widgetwidgets.TextInput(), # 页面上显示的插件render_kw{class: form-control})# 字段内部包含正则表达式pwd simple.PasswordField(label密码,validators[validators.DataRequired(message密码不能为空.),validators.Length(min8, message用户名长度必须大于%(min)d),validators.Regexp(regex^(?.*[a-z])(?.*[A-Z])(?.*\d)(?.*[$$!%*?])[A-Za-z\d$$!%*?]{8,},message密码至少8个字符至少1个大写字母1个小写字母1个数字和1个特殊字符)],widgetwidgets.PasswordInput(),render_kw{class: form-control})app.route(/login, methods[GET, POST])
def login():if request.method GET:form LoginForm()return render_template(login.html, formform)else:form LoginForm(formdatarequest.form)if form.validate():print(用户提交数据通过格式验证提交的值为, form.data)else:print(form.errors)return render_template(login.html, formform)if __name__ __main__:app.run()login.html !DOCTYPE htmlhtml langenheadmeta charsetUTF-8titleTitle/title/headbodyh1登录/h1form methodpostp{{form.name.label}} {{form.name}} {{form.name.errors[0] }}/pp{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}/pinput typesubmit value提交/form/body/html四、Flask定制命令
1.使用 flask-script定制命令(老版本新版本不用了) flask 老版本中没有命令运行项目自定制命令flask-script 解决了这个问题flask项目可以通过命令运行可以定制命令新版的flask--》官方支持定制命令 click 定制命令这个模块就弃用了flask-migrate 老版本基于flask-script新版本基于flask-click写的使用步骤-1 pip3 install Flask-Script2.0.3-2 pip3 install flask1.1.4-3 pip3 install markupsafe1.1.1-4 使用from flask_script import Managermanager Manager(app)if __name__ __main__:manager.run()-5 自定制命令manager.commanddef custom(arg):自定义命令python manage.py custom 123print(arg)- 6 执行自定制命令python manage.py custom 1232.新版本定制命令 from flask import Flaskimport clickapp Flask(__name__)自定制命令通过create-user传入用户名就可以创建一个用户来app.cli.command(create-user)click.argument(name)def create_user(name):# from pool import POOL# conn POOL.connection()# cursorconn.cursor()# cursor.excute(insert into user (username,password) values (%s,%s),args[name,hello123])# conn.commit()print(name)命令行中执行-flask --app .\Flask定制命令.py:app create-user jack-简写成 前提条件式app所在的py文件名叫app.py-flask create-user jackapp.route(/)def index():return index-运行项目的命令flask --app .\Flask定制命令.py:app runif __name__ __main__:app.run()3.Django中自定制命令 1 app下新建文件夹management/commands/2 在该文件夹下新建py文件随便命名命令名3 在py文件中写代码from django.core.management.base import BaseCommandclass Command(BaseCommand):help 命令提示def handle(self, *args, **kwargs):命令逻辑 4 使用命令python manage.py py文件(命令名)五、Flask-Cache 具体使用可以自寻去官方查看https://flask-caching.readthedocs.io/en/latest/ from flask import Flask,render_template# 安装 pip install Flask-Cachingfrom flask_caching import Cacheconfig {DEBUG: True,CACHE_TYPE: SimpleCache,CACHE_DEFAULT_TIMEOUT: 300}app Flask(__name__)app.config.from_mapping(config)cache Cache(app)app.route(/)cache.cached(timeout50)def index():return render_template(index.html)app.route(/set_cache)def set_cache():cache.set(name,xxxx)return set_cacheapp.route(/get_cache)def get_cache():res cache.get(name)return resif __name__ __main__:app.run()