代做财务报表分析网站,seo文章生成器,如何为网站做面包屑导航,找工作临时工简单介绍pymysql的一些操作#xff0c;增改删查
增
先建表#xff0c;再写数据至表中 除查询操作外#xff0c;增改删都需要commit操作#xff0c;具体原理看ref.1
import pandas as pd
import pymysql
import time
import warnings
warnings.filterwarnings(igno… 简单介绍pymysql的一些操作增改删查
增
先建表再写数据至表中 除查询操作外增改删都需要commit操作具体原理看ref.1
import pandas as pd
import pymysql
import time
import warnings
warnings.filterwarnings(ignore)建表
con pymysql.connect(hostlocalhost,port3306,userroot,password12345,dbai,charsetutf8)
# 创建游标(默认数据返回tuple,修改为dict)
cur con.cursor(cursorpymysql.cursors.DictCursor)
create_sql
create table user(id int NOT NULL AUTO_INCREMENT,name varchar(50) NOT NULL,age int NOT NULL,PRIMARY KEY (id)
)ENGINEInnoDB DEFAULT CHARSETutf8;try:# 执行sql语句cur.execute(create_sql)# 执行sql语句con.commit()
except:# 发生错误时回滚print(发生错误回滚)con.rollback()# 关闭数据库连接
con.close()con pymysql.connect(hostlocalhost,port3306,userroot,password12345,dbai,charsetutf8)
# 创建游标(默认数据返回tuple,修改为dict)
cur con.cursor(cursorpymysql.cursors.DictCursor)
sql
desc user;try:# 执行sql语句cur.execute(sql)get_df pd.DataFrame(cur.fetchall())print(get_df)# 执行sql语句con.commit()
except:# 发生错误时回滚con.rollback()
# 关闭游标
cur.close
# 关闭数据库连接
con.close()Field Type Null Key Default Extra
0 id int NO PRI None auto_increment
1 name varchar(50) NO None
2 age int NO None 插入数据
con pymysql.connect(hostlocalhost,port3306,userroot,password12345,dbai,charsetutf8)
# 创建游标(默认数据返回tuple,修改为dict)
cur con.cursor(cursorpymysql.cursors.DictCursor)
row_nums 500000
sql insert into user(name, age)values(小明, 14)
try:# 执行sql语句t1 time.time()for i in range(row_nums):cur.execute(sql)con.commit() # 提交t2 time.time()print(f循环写入耗时:{t2 - t1}) # 7s
except:# 发生错误时回滚con.rollback()
# 关闭游标
cur.close
# 关闭数据库连接
con.close()循环写入耗时:39.632535457611084批量写入
con pymysql.connect(hostlocalhost,port3306,userroot,password12345,dbai,charsetutf8)
# 创建游标(默认数据返回tuple,修改为dict)
cur con.cursor(cursorpymysql.cursors.DictCursor)
row_nums 500000
sql insert into user(name, age) values(%s,%s)
citys [(小明, 14) for i in range(row_nums)
]try:# 执行sql语句t1 time.time()# citys是参数组合每个元素对应一行insert sql的对应字段可以是元组也可以是列表cur.executemany(sql, citys) # 批量执行con.commit() # 提交t2 time.time()print(f批量写入耗时:{t2 - t1}) # 7s
except:# 发生错误时回滚con.rollback()
# 关闭游标
cur.close
# 关闭数据库连接
con.close()批量写入耗时:5.722973823547363批量写入有明显的速度优势注意insert into user(name, age) values(%s,%s)values前面有空格具体原因看ref.2
pyspark批量写入
数据量巨大时可以结合spark的foreachPartition算子并行写入
import pandas as pd
import time
import pymysql
import functools
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *def get_or_create_hudi(app_name):spark SparkSession \.builder \.appName(app_name) \.config(spark.driver.maxResultSize, 10g) \.config(spark.sql.execution.arrow.enabled, true) \.config(spark.dynamicAllocation.enabled, false) \.config(spark.sql.crossJoin.enabled, true) \.config(spark.kryoserializer.buffer.max, 512m) \.config(spark.io.compression.codec, snappy) \.config(spark.sql.hive.convertMetastoreParquet, false) \.config(spark.hadoop.dfs.namenode.acls.enabled, false) \.config(spark.sql.hive.convertMetastoreParquet, false) \.config(spark.sql.extensions, org.apache.spark.sql.hudi.HoodieSparkSessionExtension) \.config(spark.serializer, org.apache.spark.serializer.KryoSerializer) \.enableHiveSupport() \.getOrCreate()spark.sparkContext.setLogLevel(ERROR)print(\n)print(\n)return sparkdef insert2mysql_partrdd(part, db_param, value_cols[name, age], batch40000):param part:param db_param: mysql配置信息param value_cols: insert 列名称param batch: 批插入数据量return:con pymysql.connect(hostlocalhost,port3306,userroot,password12345,dbai,charsetutf8)cur con.cursor(cursorpymysql.cursors.DictCursor)cnt 0batch_list []sql sql insert into user(name, age) values(%s,%s)for row in part:# 这个操作可能会比较耗时。。有没有好方法优化下batch_list.append([row[i] for i in value_cols])cnt cnt 1if cnt 0 and cnt % batch 0:cur.executemany(sql, batch_list)con.commit() # 提交batch_list []print(f第{cnt - batch}-{cnt}行数据插入MySql!)# 最后一波数据如果不是batch余数也推过去if cnt % batch ! 0:cur.executemany(sql, batch_list)con.commit() # 提交print(f第{cnt - cnt % batch}-{cnt}行数据插入MySql!)if cnt 0:print(f数据抽样-key:{row})print(fcnt:{cnt})else:print(该分区无数据)cur.close()con.close()row_nums 500000df pd.DataFrame({name: [小明] * row_nums, age: [14] * row_nums})
spark get_or_create_hudi(test)
spark_df spark.createDataFrame(df).repartition(10)t1 time.time()
spark_df.rdd.foreachPartition(functools.partial(insert2mysql_partrdd, batch50000))
t2 time.time()
print(fspark批写入耗时:{t2 - t1}) # 1.2sspark批写入耗时:8.034992456436157速度上似乎没有更快可能数据量再大些会有效果另单机跑spark也可能有些影响
删
刚才搞了100w数据删除些
con pymysql.connect(hostlocalhost,port3306,userroot,password12345,dbai,charsetutf8)
# 创建游标(默认数据返回tuple,修改为dict)
cur con.cursor(cursorpymysql.cursors.DictCursor)
sql
delete from user where id10try:# 执行sql语句cur.execute(sql)# 执行sql语句con.commit()
except:# 发生错误时回滚print(发生错误回滚)con.rollback()# 关闭数据库连接
con.close()con pymysql.connect(hostlocalhost,port3306,userroot,password12345,dbai,charsetutf8)
# 创建游标(默认数据返回tuple,修改为dict)
cur con.cursor(cursorpymysql.cursors.DictCursor)
sql
select count(*) as cnt from usertry:# 执行sql语句cur.execute(sql)get_df pd.DataFrame(cur.fetchall())print(get_df)# 执行sql语句# con.commit()
except:# 发生错误时回滚print(发生错误回滚)con.rollback()# 关闭数据库连接
con.close()cnt
0 10还剩10条数据
查
结合pandas把查询的数据转成df
con pymysql.connect(hostlocalhost,port3306,userroot,password12345,dbai,charsetutf8)
# 创建游标(默认数据返回tuple,修改为dict)
cur con.cursor(cursorpymysql.cursors.DictCursor)
sql
select * from user limit 100try:# 执行sql语句cur.execute(sql)get_df pd.DataFrame(cur.fetchall())print(get_df)# 执行sql语句# con.commit()
except:# 发生错误时回滚print(发生错误回滚)con.rollback()# 关闭数据库连接
con.close()id name age
0 1 小明 14
1 2 小明 14
2 3 小明 14
3 4 小明 14
4 5 小明 14
5 6 小明 14
6 7 小明 14
7 8 小明 14
8 9 小明 14
9 10 小明 14改
con pymysql.connect(hostlocalhost,port3306,userroot,password12345,dbai,charsetutf8)
# 创建游标(默认数据返回tuple,修改为dict)
cur con.cursor(cursorpymysql.cursors.DictCursor)
sql
update user set name 小红 where id5try:# 执行sql语句cur.execute(sql)# 执行sql语句con.commit()
except:# 发生错误时回滚print(发生错误回滚)con.rollback()# 关闭数据库连接
con.close()con pymysql.connect(hostlocalhost,port3306,userroot,password12345,dbai,charsetutf8)
# 创建游标(默认数据返回tuple,修改为dict)
cur con.cursor(cursorpymysql.cursors.DictCursor)
sql
select * from user limit 100try:# 执行sql语句cur.execute(sql)get_df pd.DataFrame(cur.fetchall())print(get_df)# 执行sql语句# con.commit()
except:# 发生错误时回滚print(发生错误回滚)con.rollback()# 关闭数据库连接
con.close()id name age
0 1 小红 14
1 2 小红 14
2 3 小红 14
3 4 小红 14
4 5 小红 14
5 6 小明 14
6 7 小明 14
7 8 小明 14
8 9 小明 14
9 10 小明 14Ref
[1] https://www.runoob.com/python3/python3-mysql.html [2] https://www.modb.pro/db/184700 2023-07-28 台风、大雨 于南京市江宁区