做网站单页视频,网站开发多久,舟山网站建设哪家好,做网站申请什么商标背景
开发时遇到一个较为复杂的周期需求#xff0c;为了适配读取各种数据库中的数据并将数据库数据转换为DataFrame并进行后续的开发分析工作#xff0c;做了如下代码。 在爷们开发这段生产中的代码#xff0c;可适配mysql,hive,hbase#xff0c;gbase等等…背景
开发时遇到一个较为复杂的周期需求为了适配读取各种数据库中的数据并将数据库数据转换为DataFrame并进行后续的开发分析工作做了如下代码。 在爷们开发这段生产中的代码可适配mysql,hive,hbasegbase等等等等基本涉及到数据库的情况基本可以进行。可以说是非常之NB了
数据流程
由于该代码片段主要关注数据处理流程而非实际数据内容,当然我也不能把特殊数据给大家展示后面有时间再造一批test数据吧因此没有提供样例数据。不过可以根据实际使用的数据库和表结构提供相应的样例数据以供测试和验证。
具体的数据自己造就行了咱这段逻辑时经过大数据量考验的
代码v1
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions.{udf,col,date_format}
import scala.util.matching.Regex//2024,lee研发适配,可以说这段代码的通用性非常高非常NB
//time:202401GZ
// https://blog.csdn.net/qq_52128187?typeblog
//获取数据库中的数据并转为dataframe,可以使hbase也可以是mysql
val table数据库传出的数据
val dfininputRDD(table).asInstanceOf[org.apache.spark.sql.DataFrame]
dfin.createOrReplaceTempView(s$table)
dfin.show(3)
val sql_table sql条件
val sql inputRDD(sql_table).asInstanceOf[String]
println(打印前序导出的sql: sql)//正则结合sql与df,获取最终数据
val regex new Regex((?!)from\\s[^\\s])
val actualSql regex.replaceFirstIn(sql,sfrom ${table})
println(打印最终sql:actualSql)//解析sql
val resultDf spark.sql(actualSql)
resultDf.show(10,false)
代码V2
在另一个环境测试时上面的代码运行时出现了一个bug如下
org.apache.spark.sql.AnalysisException:
org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table all_beforexxxxx.Exception thrown when executing query :SELECT DISTINCT org.apache.hadoop.hive.metastore.model.MTable AS
NUCLEUS_TYPE,A0.CREATE_TIME,A0.LAST_ACCESS_TIME,A0.OWNER,A0.OWNER_TYPE,
A0.RETENTION,A0.TBL_NAME,A0.TBL_TYPE,A0.TBL_ID FROM TBLS A0
LEFT OUTER JOIN DBS B0 ON A0.DB_ID B0.DB_ID
WHERE A0.TBL_NAME ? AND B0.NAME ?; 问题描述详细排查了一下是由于解析sql语句时出现了一个bug,但是我在另一个环境这样写是可以解析的神奇哦 问题解决我是如何解决的呢一看就是解析表的时候出现问题定位代码是正则表达式的问题。修改后的代码
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions.{udf,col,date_format}
import scala.util.matching.Regex//2024
//2024,lee研发适配,可以说这段代码的通用性非常高非常NB
//time:202401GZ
// https://blog.csdn.net/qq_52128187?typeblog
//获取数据库中的数据并转为dataframe,可以使hbase也可以是mysql
val table数据_tegeXNph
val dfininputRDD(table).asInstanceOf[org.apache.spark.sql.DataFrame]
dfin.createOrReplaceTempView(s$table)
dfin.show(3)//获取sql语句
val sql_table sql条件导出_周期
val sql inputRDD(sql_table).asInstanceOf[String]
println(打印前序导出的sql: sql)//正则结合sql与df,获取最终数据
// val regex new Regex((?!)from\\s[^\\s])会报错
//org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table all_before_xxx. Exception thrown when executing query : SELECT DISTINCT org.apache.hadoop.hive.metastore.model.MTable AS NUCLEUS_TYPE,A0.CREATE_TIME,A0.LAST_ACCESS_TIME,A0.OWNER,A0.OWNER_TYPE,A0.RETENTION,A0.TBL_NAME,A0.TBL_TYPE,A0.TBL_ID FROM TBLS A0 LEFT OUTER JOIN DBS B0 ON A0.DB_ID B0.DB_ID WHERE A0.TBL_NAME ? AND B0.NAME ?;val regex new Regex(from\\s(\\S)) // 做了排查bug修改修改后的正则表达式
val actualSql regex.replaceFirstIn(sql,sfrom ${table})
println(打印最终sql:actualSql)//解析sql
val resultDf spark.sql(actualSql)
resultDf.show(10,false)