网站建设是一个什么的过程,企业文化墙,设计方案万能模板,网站开发 常德Spark读写MySQL数据库 文章目录 Spark读写MySQL数据库一、读取数据库#xff08;一#xff09;通过RDD的方式读取MySQL数据库#xff08;二#xff09;通过DataFrame的方式读取MySQL数据库 二、添加数据到MySQL#xff08;一#xff09;通过RDD的方式插入数据到MySQL一通过RDD的方式读取MySQL数据库二通过DataFrame的方式读取MySQL数据库 二、添加数据到MySQL一通过RDD的方式插入数据到MySQL二通过RDD的方式插入数据到MySQL 2三使用DataFrame插入数据到MySQL 一、读取数据库
一通过RDD的方式读取MySQL数据库
四要素驱动、连接地址、账号密码
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.sql.SparkSessionimport java.sql.DriverManager/*** 使用RDD读取MySQL数据库*/
object spark_read_mysql {def main(args: Array[String]): Unit {//创建SparkSession作用连接Sparkval spark SparkSession.builder().master(local[*]) //指定运行的方式.appName(spark_read_mysql) //程序的名字.getOrCreate()//创建SparkContextval sc spark.sparkContext//驱动名称val driver com.mysql.cj.jdbc.Driver//连接信息val url jdbc:mysql://192.168.80.145:3306/test//用户名val username root//密码val password 123456//具体的SQL查询语句val sql select * from t_user where id? and id?//查询val rsRDD new JdbcRDD(sc,(){//加载驱动Class.forName(driver)//创建和MySQL数据库的连接DriverManager.getConnection(url,username,password)},//需要执行的SQL语句sql,//查询的开始行1,//查询的结束行20,//运行几个分区执行2,//返回值的处理将返回值变为RDD的元素数字从1开始表示字段的编号rs (rs.getInt(1),rs.getString(2),rs.getInt(3)))//将RDD的元素打印在终端rsRDD.collect().foreach(println)sc.stop()}
}二通过DataFrame的方式读取MySQL数据库
import org.apache.spark.sql.SparkSession/*** 使用DataFrame读取MySQL数据库*/
object spark_read_mysql2 {def main(args: Array[String]): Unit {//创建SparkSession作用连接Sparkval spark SparkSession.builder().master(local[*])//指定运行的方式.appName(spark_read_mysql2)//程序的名字.getOrCreate()//创建DataFrameval jdbcDF spark.read.format(jdbc).option(url,jdbc:mysql://192.168.80.145:3306/test)//指定连接.option(driver,com.mysql.cj.jdbc.Driver)//指定驱动.option(user,root)//指定连接的用户.option(password,123456)//指定连接的用户的密码.option(dbtable,t_user)//查询的表.load()//加载数据库表//在终端显示DataFrame的内容jdbcDF.show()}
}二、添加数据到MySQL
一通过RDD的方式插入数据到MySQL
每个分区执行一次创建连接和关闭连接
import org.apache.spark.sql.SparkSessionimport java.sql.DriverManager/*** 使用RDD插入数据到MySQLRDD的每个元素都会执行一次创建连接和关闭连接*/
object spark_write_mysql {def main(args: Array[String]): Unit {//创建SparkSession作用连接Sparkval spark SparkSession.builder().master(local[*]) //指定运行的方式.appName(spark_write_mysql) //程序的名字.getOrCreate()//创建SparkContextval sc spark.sparkContext//驱动名称val driver com.mysql.cj.jdbc.Driver//连接信息//?useUnicodetruecharacterEncodingUTF-8 指定连接的参数字符集为utf8防止插入的数据中文乱码val url jdbc:mysql://192.168.80.145:3306/test?useUnicodetruecharacterEncodingUTF-8//用户名val username root//密码val password 123456//创建RDDval rdd sc.makeRDD(List((zhaoba,20),(孙七,19)))//打印RDD的元素//rdd.collect().foreach(println)//通过循环的方式读取RDD的每条元素将元素插入MySQL一个元素执行一次创建连接和插入和关闭连接rdd.foreach {case (name,age) {//加载驱动Class.forName(driver)//创建和MySQL的链接val conn DriverManager.getConnection(url,username,password)//添加的SQL语句val sql insert into t_user(name,age) values(?,?)//给SQL语句配置参数val ps conn.prepareStatement(sql)//根据参数的类型配置参数ps.setString(1,name)ps.setInt(2,age)//执行SQL语句ps.executeUpdate()//关闭连接ps.close()conn.close()}}sc.stop()}
}二通过RDD的方式插入数据到MySQL 2
每个分区执行一次创建连接和关闭连接
import org.apache.spark.sql.SparkSessionimport java.sql.DriverManager/*** 使用RDD插入数据到MySQLRDD的每个分区执行一次创建连接和关闭连接推荐*/
object spark_write_mysql2 {def main(args: Array[String]): Unit {//创建SparkSession作用连接Sparkval spark SparkSession.builder().master(local[*]) //指定运行的方式.appName(spark_write_mysql2) //程序的名字.getOrCreate()//创建SparkContextval sc spark.sparkContext//驱动名称val driver com.mysql.cj.jdbc.Driver//连接信息//?useUnicodetruecharacterEncodingUTF-8 指定连接的参数字符集为utf8防止插入的数据中文乱码val url jdbc:mysql://192.168.80.145:3306/test?useUnicodetruecharacterEncodingUTF-8//用户名val username root//密码val password 123456//创建RDDval rdd sc.makeRDD(List((zhaoba,20),(孙七,19)))//打印RDD的元素//rdd.collect().foreach(println)//通过循环的方式读取RDD的每个分区将元素插入MySQL一个分区执行一次创建连接和关闭连接rdd.foreachPartition {datas {//加载驱动Class.forName(driver)//创建和MySQL的链接val conn DriverManager.getConnection(url,username,password)//添加的SQL语句val sql insert into t_user(name,age) values(?,?)//给SQL语句配置参数val ps conn.prepareStatement(sql)//根据参数的类型配置参数datas.foreach{case (name,age){ps.setString(1,name)ps.setInt(2,age)//执行SQL语句ps.executeUpdate()}}//关闭连接ps.close()conn.close()}}sc.stop()}
}三使用DataFrame插入数据到MySQL
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}/*** 使用DataFrame插入数据到MySQL*/object spark_write_mysql3 {def main(args: Array[String]): Unit {//创建SparkSession作用连接Sparkval spark SparkSession.builder().master(local[*]) //指定运行的方式.appName(spark_write_mysql3) //程序的名字.getOrCreate()//1.创建DataFrame//1.1 schemaval schema StructType(List(StructField(name, StringType,true),StructField(age,IntegerType,true)))//1.2 行rows//1.2.1 创建RDDval dataRDD spark.sparkContext.parallelize(Array(Array(李四,20),Array(王五,20)))//1.2.2 创建rowsval rows dataRDD.map(xRow(x(0),x(1)))//1.3 拼接表头schema和行内容rowsval df spark.createDataFrame(rows,schema)//2.通过DataFrame插入数据到MySQL//如果直接使用df.write则会将整个DataFrame的表写入MySQL形成一个新表需要注意表不能存在//df.write.mode(append)是以追加的方式将数据写入到已经存在的表中df.write.format(jdbc).option(url, jdbc:mysql://192.168.80.145:3306/test?useUnicodetruecharacterEncodingUTF-8) //指定连接.option(driver, com.mysql.cj.jdbc.Driver) //指定驱动.option(user, root) //指定连接的用户.option(password, 123456) //指定连接的用户的密码.option(dbtable, t_user2) //查询的表.save()//保存数据}
}