室内设计网站资源,加速器网页版,源码网站有哪些,在线设计房屋效果图https://spark-packages.org/里有很多third-party数据源的package#xff0c;spark把包加载进来就可以使用了csv格式在spark2.0版本之后是内置的#xff0c;2.0之前属于第三方数据源一、读取本地外部数据源1.直接读取一个json文件[hadoophadoop000 bin]$ ./spark-shell --mas… https://spark-packages.org/里有很多third-party数据源的packagespark把包加载进来就可以使用了csv格式在spark2.0版本之后是内置的2.0之前属于第三方数据源一、读取本地外部数据源1.直接读取一个json文件[hadoophadoop000 bin]$ ./spark-shell --master local[2] --jars ~/software/mysql-connector-java-5.1.27.jar scala spark.read.load(file:///home/hadoop/app/spark-2.3.1-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json).show运行报错Caused by: java.lang.RuntimeException: file:/home/hadoop/app/spark-2.3.1-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 57, 125, 10] at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:476) at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:445) at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:421) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:519) ... 32 more查看load方法的源码/** * Loads input in as a DataFrame, for data sources that require a path (e.g. data backed by * a local or distributed file system). * * since 1.4.0 */ def load(path: String): DataFrame {option(path, path).load(Seq.empty: _*) // force invocation of load(...varargs...) }---------------------------------------------------------/** * Loads input in as a DataFrame, for data sources that support multiple paths. * Only works if the source is a HadoopFsRelationProvider. * * since 1.6.0 */ scala.annotation.varargsdef load(paths: String*): DataFrame {if (source.toLowerCase(Locale.ROOT) DDLUtils.HIVE_PROVIDER) { throw new AnalysisException(Hive data source can only be used with tables, you can not read files of Hive data source directly.) } val cls DataSource.lookupDataSource(source, sparkSession.sessionState.conf)if (classOf[DataSourceV2].isAssignableFrom(cls)) { val ds cls.newInstance() val options new DataSourceOptions((extraOptions DataSourceV2Utils.extractSessionConfigs( ds ds.asInstanceOf[DataSourceV2], conf sparkSession.sessionState.conf)).asJava) // Streaming also uses the data source V2 API. So it may be that the data source implements // v2, but has no v2 implementation for batch reads. In that case, we fall back to loading // the dataframe as a v1 source. val reader (ds, userSpecifiedSchema) match {case (ds: ReadSupportWithSchema, Some(schema)) ds.createReader(schema, options)case (ds: ReadSupport, None) ds.createReader(options)case (ds: ReadSupportWithSchema, None) throw new AnalysisException(sA schema needs to be specified when using $ds.)case (ds: ReadSupport, Some(schema)) val reader ds.createReader(options)if (reader.readSchema() ! schema) { throw new AnalysisException(s$ds does not allow user-specified schemas.) } readercase _ null // fall back to v1 }if (reader null) { loadV1Source(paths: _*) } else { Dataset.ofRows(sparkSession, DataSourceV2Relation(reader)) } } else { loadV1Source(paths: _*) } }private def loadV1Source(paths: String*) { // Code path for data source v1. sparkSession.baseRelationToDataFrame( DataSource.apply( sparkSession, paths paths, userSpecifiedSchema userSpecifiedSchema, className source, options extraOptions.toMap).resolveRelation()) }------------------------------------------------------private var source: String sparkSession.sessionState.conf.defaultDataSourceName-------------------------------------------------------def defaultDataSourceName: String getConf(DEFAULT_DATA_SOURCE_NAME)--------------------------------------------------------// This is used to set the default data source val DEFAULT_DATA_SOURCE_NAME buildConf(spark.sql.sources.default) .doc(The default data source to use in input/output.) .stringConf .createWithDefault(parquet)从源码中可以看出如果不指定formatload默认读取的是parquet文件scala val users spark.read.load(file:///home/hadoop/app/spark-2.3.1-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet)scala users.show()------------------------------------ | name|favorite_color|favorite_numbers|------------------------------------|Alyssa| null| [3, 9, 15, 20]|| Ben| red| []|------------------------------------读取其他格式的文件必须通过format指定文件格式如下//windows idea环境下val df1 spark.read.format(json).option(timestampFormat, yyyy/MM/dd HH:mm:ss ZZ).load(hdfs://192.168.137.141:9000/data/people.json)df1.show()-----------| age| name|-----------|null|Michael|| 30| Andy|| 19| Justin|-----------option(timestampFormat, yyyy/MM/dd HH:mm:ss ZZ)必须带上不然报错Exception in thread main java.lang.IllegalArgumentException: Illegal pattern component: XXX2.读取CSV格式文件//源文件内容如下[hadoophadoop001 ~]$ hadoop fs -text /data/people.csvname;age;jobJorge;30;DeveloperBob;32;Developer//windows idea环境下val df2 spark.read.format(csv) .option(timestampFormat, yyyy/MM/dd HH:mm:ss ZZ) .option(sep,;) .option(header,true) //use first line of all files as header .option(inferSchema,true) .load(hdfs://192.168.137.141:9000/data/people.csv)df2.show()df2.printSchema()//输出结果-----------------| name|age| job|-----------------|Jorge| 30|Developer|| Bob| 32|Developer|-----------------root |-- name: string (nullable true) |-- age: integer (nullable true) |-- job: string (nullable true)-----------------------------------------------------------//如果不指定option(sep,;)------------------| name;age;job|------------------|Jorge;30;Developer|| Bob;32;Developer|------------------//如果不指定option(header,true)-----------------| _c0|_c1| _c2|-----------------| name|age| job||Jorge| 30|Developer|| Bob| 32|Developer|-----------------读取csv格式文件还可以自定义schemaval peopleschema StructType(Array(StructField(hlwname,StringType,true), StructField(hlwage,IntegerType,true), StructField(hlwjob,StringType,true)))val df2 spark.read.format(csv).option(timestampFormat, yyyy/MM/dd HH:mm:ss ZZ).option(sep,;) .option(header,true) .schema(peopleschema) .load(hdfs://192.168.137.141:9000/data/people.csv) //打印测试 df2.show() df2.printSchema()输出结果----------------------|hlwname|hlwage| hlwjob|----------------------| Jorge| 30|Developer|| Bob| 32|Developer|----------------------root |-- hlwname: string (nullable true) |-- hlwage: integer (nullable true) |-- hlwjob: string (nullable true)二、将读取的文件以其他格式写出//将上文读取的users.parquet以json格式写出scala users.select(name,favorite_color).write.format(json).save(file:///home/hadoop/tmp/parquet2json/)[hadoophadoop000 ~]$ cd /home/hadoop/tmp/parquet2json[hadoophadoop000 parquet2json]$ lltotal 4-rw-r--r--. 1 hadoop hadoop 56 Sep 24 10:15 part-00000-dfbd9ba5-598f-4e0c-8e81-df85120333db-c000.json-rw-r--r--. 1 hadoop hadoop 0 Sep 24 10:15 _SUCCESS[hadoophadoop000 parquet2json]$ cat part-00000-dfbd9ba5-598f-4e0c-8e81-df85120333db-c000.json {name:Alyssa}{name:Ben,favorite_color:red}//将上文读取的people.json以csv格式写出df1.write.format(csv) .mode(overwrite) .option(timestampFormat, yyyy/MM/dd HH:mm:ss ZZ) .save(hdfs://192.168.137.141:9000/data/formatconverttest/)------------------------------------------[hadoophadoop001 ~]$ hadoop fs -text /data/formatconverttest/part-00000-6fd65eff-d0d3-43e5-9549-2b11bc3ca9de-c000.csv,Michael30,Andy19,Justin//发现若没有.option(header,true)写出的csv丢失了首行的age,name信息//若不指定.option(sep,;)默认逗号为分隔符此操作的目的在于学会类型转换生产上最开始进来的数据大多都是textjson等行式存储的文件一般都要转成ORCparquet列式存储的文件加上压缩能把文件大小减小到10%左右大幅度减小IO和数据处理量提高性能此时如果再执行一次save路径不变则会报错scala users.select(name,favorite_color).write.format(json).save(file:///home/hadoop/tmp/parquet2json/)org.apache.spark.sql.AnalysisException: path file:/home/hadoop/tmp/parquet2json already exists.; at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:109) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104).........................................................可以通过设置savemode来解决这个问题默认是errorifexistsscala users.select(name,favorite_color).write.format(json).mode(overwrite).save(file:///home/hadoop/tmp/parquet2json/)作者若泽数据—白面葫芦娃92 原文https://www.jianshu.com/p/6fde69ea56bc回归原创文章:若泽数据2018视频集合Flink生产最佳实践2018年12月刚出炉我去过端午、国庆生产项目线下班你呢2019元旦-线下项目第11期圆满结束大数据生产预警平台项目之文章汇总学习大数据的路上,别忘了多给自己鼓掌明年毕业的我拿了大数据30万的offer最全的Flink部署及开发案例我司KafkaFlinkMySQL生产完整案例代码代码 | Spark读取mongoDB数据写入Hive普通表和分区表我司Spark迁移Hive数据到MongoDB生产案例代码2019高级班线下班报名咨询请加