铜川市住房和城乡建设局网站,做软件界面一般用什么软件,wordpress上传视频只有声音,自己主机做多个网站pymysql的使用
1 驱动
MySQL基于TCP协议之上开发#xff0c;但是网络连接后#xff0c;传输的数据必须遵循MySQL的协议。 封装好MySQL协议的包#xff0c;就是驱动程序。
MySQL的驱动#xff1a; MySQLdb: 最有名的库。对MySQL的C Client封装实现#xff0c;支持Python…pymysql的使用
1 驱动
MySQL基于TCP协议之上开发但是网络连接后传输的数据必须遵循MySQL的协议。 封装好MySQL协议的包就是驱动程序。
MySQL的驱动 MySQLdb: 最有名的库。对MySQL的C Client封装实现支持Python2不更新了不支持Python3 MySQL官方Connector pymysql: 语法兼容MySQLdb使用Python写的库支持Python3
2 pymysql的使用
连接数据库pymysql.connect方法返回Connections模块下的Connection类的实例。
Connection类的事务管理Connection.begin开始事务 Connection.commit提交 Connection.rollback回滚。
游标cursor操作数据库必须使用游标。需先获取一个游标对象
Connection.cursor(cursorNone)方法返回一个新的游标对象 cursor参数可以指定一个Cursor类比如DictCursor类,默认为Cursor类。Cursor类Cursor.fetchone获取结果集的下一行Cursor.fetchmang获取结果集的指定行数Cursor.fetchall返回结果集的所有行。DictCursor类会将查询数据库结果集的每一行转换为字典key为列名value为值。
注意 Cursor的fetch操作是结果集结果集是保存在客户端的也就是说fetch的时候查询已经结束了。
数据库交互的一般流程 建立连接 – 获取游标 – 执行sql – 提交事务 – 释放资源
import pymysql
from pymysql.cursors import DictCursordef update_database():global connconn Nonecursor Nonetry:# 建立连接, 支持上下文with pymysql.connect(host127.0.0.1, userroot, passwordcli*963., databasetest) as conn:print(conn.ping(False)) # 测试数据库连接是否活着参数reconnect表示如果是断开时是否重连# 获取一个cursorcursor conn.cursor()for i in range(10): # 批量提交一般commit一般放到最后统一commit效率更高。sql insert into tee values (12, zhonghua_{}, 21).format(i)res cursor.execute(sql)print(res)conn.commit() # connection默认不commitautocommitFalseexcept Exception as err:print(err)conn.rollback() # 异常回滚finally:if cursor:cursor.close()def get_database():sql select * from teetry:with pymysql.connect(host127.0.0.1, userroot, passwordcli*963., databasetest) as _conn:with _conn.cursor() as cursor:line cursor.execute(sql)print(line)print(cursor.fetchone())print(cursor.fetchone())print(cursor.fetchmany(2))print(cursor.fetchmany(2))print(cursor.fetchall())print(cursor.rownumber)cursor.rownumber 0 # 改变游标位置指向初始位置cursor.rownumber -2 # 支持负向索引print(cursor.fetchone())print(cursor.rowcount) # 返回总行数# fetch操作的是结果集结果集是保存在客户端的也就是说fetch的时候查询结果已经结束了。print(*---------------------------*)with _conn.cursor(cursorDictCursor) as cursor: # Cursor类有一个Mixin的子类DictCursor将返回结果保证为dictcursor.execute(sql)print(cursor.fetchone())print(cursor.fetchone())print(cursor.fetchmany(2))print(cursor.fetchmany(2))print(cursor.fetchall())except Exception as err:print(error:, err)3 sql注入攻击
什么是SQL注入攻击 猜测后台数据库的查询语句使用拼接字符串的方式从而经过设计为服务端传参令其拼接出特殊字符串返回用户想要的结果。
例如使用字符串拼接sql语句select * from tee where id{}.format(10 or 11)进行查询。结果为select * from tee where id10 or 11where子句永远为真导致整个表格被查出来。
永远不要相信客户端传来的数据是规范的及安全的
如何解决注入攻击 参数化查询可以有效防止注入攻击并提高查询的效率。 Cursor.execute(query, argsNone)
args必须是元组、列表或字典。如果查询字符串使用%(name)s就必须使用字典。
import pymysql
from pymysql.cursors import DictCursordef sql_injection_test():sql select * from tee where id{}.format(10 or 11) # 字符串拼接使where子句永远为真导致整个表格被查出来。sql1 select * from tee where id%s# 解决注入攻击参数化查询可以有效防止注入攻击并提高查询的效率try:with pymysql.connect(host127.0.0.1, userroot, passwordcli*963., databasetest) as _conn:with _conn.cursor(DictCursor) as cursor:cursor.execute(sql) # 导致注入攻击print(aaaaaaaaaaaaaaa)print(cursor.fetchall())args (10 or 11,)cursor.execute(sql1, argsargs) # 参数化查询防止sql注入。 args可以是元组、列表、字典为字典时sql1 查询字符串必须是%(name)s格式print(bbbbbbbbbbbbbbb)print(cursor.fetchall())except Exception as err:print(err)参数化查询为什么提高效率 原因就是–SQL语句缓存。 数据库服务器一般会对SQL语句编译和缓存编译只对SQL语句部分所以参数中就算有SQL指令也不会被执行。
编译过程需要词法分析、语法分析、生成AST、优化、生成执行计划等过程比较耗费资源。服务端会先查找是否对同一条查询语句进行了缓存如果缓存未失效则不需要再次编译从而降低了编译的成本降低了内存消耗。
可以认为SQL语句字符串就是一个key如果使用拼接方案每次发过去的SQL语句都不一样都需要编译并缓存。
大量查询的时候首选使用参数化查询以节省资源。
开发时应该使用参数化查询。
注意这里说的是查询字符串的缓存不是查询结果的缓存。
4 pysql连接池实现
设计一个连接池可以设置池大小的容器连接池存放着数据库的连接。使用时从池中获取一个连接用完归还。从而减少频繁的创建、销毁数据库连接的过程提高性能。
设计
设计一个池对象ConnPool。构建时传入连接数据库的相关参数用户名、密码、主机、端口、数据库名。考虑多线程使用使用get从池中拿走一个连接用完归还。
import queue
import threading
import time
import loggingimport pymysql
from pymysql.cursors import DictCursorlogging.basicConfig(levellogging.INFO)class ConnPool:def __init__(self, size: int 10, *, host: str, user: str, password: str, database: str, **kwargs):if not isinstance(size, int) or size 1:size 10self.size sizeself._pool queue.Queue() # 使用队列作为连接池的容器。get时有连接则获取否则阻塞。for _ in range(size):self._pool.put(pymysql.connect(hosthost, useruser, passwordpassword, databasedatabase, **kwargs))self.local threading.local() # 使用threading.local记录每一个线程获取的连接实例用完归还def get_conn(self):# 一个线程多次拿连接场景单个线程未归还只能拿同一个保证threading.local.conn记录的是同一个连接if getattr(self.local, conn, None) is None:_conn self._pool.get()self.local.conn _connreturn self.local.conndef return_conn(self, _conn: pymysql.connect):if isinstance(_conn, pymysql.connect):self._pool.put(_conn)self.local.conn None # 归还连接后threading.local应置为None# threading.local只能解决不同线程使用conn的问题线程内必须同步方式使用自动拿连接并规划自动提交或回滚避免线程内多次拿连接或update多次后没有commit# 通过上下文实现自动连接、自动提交或回滚并归还连接。def __enter__(self):return self.get_conn()def __exit__(self, exc_type, exc_val, exc_tb):# __exit__前不知道归还的连接是哪一个正好使用threading.local记录当前线程获取的是哪一个连接if exc_type: # 存在异常回滚self.local.conn.rollback()else: # 没有异常提交self.local.conn.commit()self.return_conn(self.local.conn)def foo(_pool: ConnPool):# conn _pool.get_conn()time.sleep(3)with _pool as cur_conn: # 使用连接池的上下文with cur_conn.cursor(DictCursor) as cursor:cursor.execute(select * from tee where id%s, args(10,))logging.info({}:{}.format(threading.current_thread().name, cursor.rowcount))for column in cursor: # cursor是一个可迭代对象迭代的是查询的每一条记录logging.info({}:{}.format(threading.current_thread().name, column))if __name__ __main__:# update_database()# get_database()# sql_injection_test()pool ConnPool(host127.0.0.1, userroot, passwordcli*963., databasetest)# 连接池连接获取的游标cursor不能跨线程使用因为线程A用完cursor可能关闭而线程B还没使用这时候会抛异常。# 使用队列实现线程池也可以改用信号量来实现。for i in range(8):threading.Thread(targetfoo, args(pool,), namefoo_{}.format(i)).start()该连接池存在的问题
连接池连接获取的游标cursor不能跨线程使用因为线程A用完cursor可能关闭而线程B还没使用这时候会抛异常。使用队列实现线程池也可以改用信号量来实现。