龙岗做网站公司szaow,美术馆网站建设,企业网站建设费用大约多少钱,中国建设教育协会网站打不开目录#xff1a; 每篇前言#xff1a;DBUtils库模式一#xff08;底层使用threading.local实现#xff09;#xff1a;模式二#xff1a; Flask中使用方式一#xff1a;直接将DBUtils初始化放到settings.py文件中方式二#xff1a;从utils文件夹中导入 脚本使用DBUtils… 目录 每篇前言DBUtils库模式一底层使用threading.local实现模式二 Flask中使用方式一直接将DBUtils初始化放到settings.py文件中方式二从utils文件夹中导入 脚本使用DBUtils代码demo 每篇前言 作者介绍【孤寒者】—CSDN全栈领域优质创作者、HDZ核心组成员、华为云享专家Python全栈领域博主、CSDN原力计划作者 本文已收录于Flask框架从入门到实战专栏《Flask框架从入门到实战》热门专栏推荐《Python全栈系列教程》、《爬虫从入门到精通系列教程》、《爬虫进阶实战系列教程》、《Scrapy框架从入门到实战》、《Flask框架从入门到实战》、《Django框架从入门到实战》、《Tornado框架从入门到实战》、《前端系列教程》。本专栏面向广大程序猿为的是大家都做到Python全栈技术从入门到精通穿插有很多实战优化点。订阅专栏后可私聊进一千多人Python全栈交流群手把手教学问题解答 进群可领取Python全栈教程视频 多得数不过来的计算机书籍基础、Web、爬虫、数据分析、可视化、机器学习、深度学习、人工智能、算法、面试题等。加入我一起学习进步一个人可以走的很快一群人才能走的更远 引子
在上一个demo项目中登录部分验证是直接写死的本文模拟实际生产查询MySQL数据库做验证。
一个很low的方法是 项目根目录下创建utils/sql.py
import pymysqlclass SQLHelper(object):staticmethoddef open():conn pymysql.connect(host127.0.0.1, port3306, password123456, dbUserInfo)cursor conn.cursor(cursorpymysql.cursors.DictCursor)return conn, cursorstaticmethoddef close(conn, cursor):conn.commit()cursor.close()conn.close()classmethoddef fetch_one(cls, sql, args):conn, cursor cls.open()cursor.execute(sql, args)obj cursor.fetchone()cls.close(conn, cursor)return obj 上面这种用法很严重且明显的一个问题是 每次登录一次都要和数据库创建一个连接
解决方法就是使用DBUtils三方库
DBUtils库
pip install DBUtils1.3DBUtils 是一套用于管理数据库连接池的Python包为高频度高并发的数据库访问提供更好的性能可以自动管理连接对象的创建和释放。并允许对非线程安全的数据库接口进行线程安全包装。
这种连接池有两种连接模式 PersistentDB提供线程专用的数据库连接并自动管理连接。 为每个线程创建一个连接线程即使调用了close方法也不会关闭只是把连接重新放到连接池供自己线程再次使用。当线程终止时连接自动关闭 PooledDB提供线程间可共享的数据库连接并自动管理连接。 创建一批连接到连接池供所有线程共享使用。 PS由于pymysql的threadsafety值为1而DBUtils库用的内部又是用的pymysql所以该模式连接池中的线程会被所有线程共享。
实测证明 PersistentDB 的速度是最高的即第一种模式但是在某些特殊情况下数据库的连接过程可能异常缓慢而此时的PooledDB即模式二所以推荐这个则可以提供相对来说平均连接时间比较短的管理方式。
模式一底层使用threading.local实现
了解即可~~~
from DBUtils.PersistentDB import PersistentDB
import pymysqlPOOL PersistentDB(creatorpymysql, # 连接数据库的模块maxusageNone, # 一个数据库连接最多被重复使用的次数None表示无限制setsession[], # 开始会话前执行的命令列表。如[set time zone ...]ping0, # ping MySQL客户端检查服务是否可用。 【一般用0,4,7】# 0 None never; 1 default whenever if is requested; 2 when a cursor is created; 4 when a query is executed; 7 alwayscloseableFalse, # Falseconn.close()实际上被忽略供下次使用在线程关闭时才会自动关闭连接Trueconn.close()则关闭连接再次调用就是一个新的连接了threadlocalNone, # 本线程独享值的对象用于保存链接对象如果链接对象被重置也清除host127.0.0.1,port6379,userroot,password123456,databaseUserInfo,charsetutf8
)使用
def demo():conn POOL.connection(shareableFalse)cursor conn.cursor()cursor.execute(select * from users)result cursor.fetchall()cursor.close() conn.close()
模式二
用这个~~~
from DBUtils.PooledDB import PooledDB
import pymysqlPOOL PooledDB(creatorpymysql, # 连接数据库的模块maxconnections6, # 连接池允许的最大连接数 0和None表示不限制mincached2, # 初始化时连接池中至少创建的空闲的连接0表示不创建maxcached5, # 连接池中最多闲置的连接0和None不限制maxshared3, # 连接池中最多共享的连接数量0和None表示全部共享【默认为0而且哪怕设置别的值也无用下面会将为啥】blockingTrue, # 连接池中如果没有可用连接后是否阻塞等待。True等待False不等待直接报错maxusageNone, # 一个连接最多被重复使用的次数None表示无限制setsession[], # 开始会话前执行的命令列表。如[set time zone ...]ping0, # ping MySQL客户端检查服务是否可用。 【一般用0,4,7】# 0 None never; 1 default whenever if is requested; 2 when a cursor is created; 4 when a query is executed; 7 alwayshost127.0.0.1,port6379,userroot,password123456,databaseUserInfo,charsetutf8
) 为什么设置maxshared参数的值是无用的因为DBUtils使用的pymysql而pymysql内部threadsafety值为1。看源码 import pymysqlDBUtils源码 from DBUtils.PooledDB import PooledDB使用
def demo():检测当前正在运行连接数是否小于最大连接数如果不小于则等待或报错raise TooManyConnection异常否则则优先去初始化时创建的连接中获取连接SteadyDBConnection然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。如果最开始创建的连接没有连接则去创建一个SteadyDBConnection对象再封装到PooledDedicatedDBConnection中并返回。一旦关闭连接后连接就返回到连接池让后续线程继续使用~conn POOL.connection()cursor conn.cursor()cursor.execute(select * from users)result cursor.fetchall()conn.close()Flask中使用
方式一直接将DBUtils初始化放到settings.py文件中 方式二从utils文件夹中导入 脚本使用DBUtils代码demo
# codingutf-8使用DBUtils数据库连接池中的连接操作数据库import datetime
import pymysql
from DBUtils.PooledDB import PooledDBclass MysqlClient(object):def __init__(self, **kwargs):self.pool self.create_pool(**kwargs)self.connection Noneself.cursor Nonedef create_pool(self, **kwargs):return PooledDB(pymysql,mincachedkwargs.get(mincached, 10),maxcachedkwargs.get(maxcached, 20),maxsharedkwargs.get(maxshared, 10),maxconnectionskwargs.get(maxconnections, 200),blockingkwargs.get(blocking, True),maxusagekwargs.get(maxusage, 100),setsessionkwargs.get(setsession, None),resetkwargs.get(reset, True),hostkwargs.get(host, 127.0.0.1),portkwargs.get(port, 3306),dbkwargs.get(db, mysqldemo),userkwargs.get(user, root),passwdkwargs.get(passwd, 123456),charsetkwargs.get(charset, utf8mb4),cursorclasspymysql.cursors.DictCursor)def get_conn_cursor(self):self.connection self.pool.connection()self.cursor self.connection.cursor()def close(self):try:if self.cursor:self.cursor.close()if self.connection:self.connection.close()except Exception as e:print(e)def execute(self, sql, param()):try:self.get_conn_cursor()count self.cursor.execute(sql, param)print(count)return countfinally:self.close()def __dict_datetime_obj_to_str(self, result_dict):把字典里面的datetime对象转成字符串使json转换不出错if result_dict:return {k: v.__str__() if isinstance(v, datetime.datetime) else v for k, v in result_dict.items()}return result_dictdef select_one(self, sql, param()):查询单个结果try:self.get_conn_cursor()count self.execute(sql, param)result self.cursor.fetchone()result self.__dict_datetime_obj_to_str(result)return count, resultfinally:self.close()def select_many(self, sql, param()):查询多个结果try:self.get_conn_cursor()count self.execute(sql, param)result self.cursor.fetchall()result [self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]return count, resultfinally:self.close()def begin(self):开启事务try:self.get_conn_cursor()self.connection.autocommit(False)except Exception as e:print(e)def end(self, optioncommit):结束事务try:if option commit:self.connection.commit()else:self.connection.rollback()except Exception as e:print(e)finally:self.connection.autocommit(True)if __name__ __main__:mc MysqlClient()sql1 SELECT * FROM customers WHERE customerNumber 103result1 mc.select_one(sql1)print(result1[1])sql2 SELECT * FROM customers WHERE customerNumber IN (%s,%s,%s)param (103, 144, 145)print(mc.select_many(sql2, param)[1])