当前位置: 首页 > news >正文

建设银行信用卡去网站中国互联网协会曹伟

建设银行信用卡去网站,中国互联网协会曹伟,长沙网站维护,网络推广合作资源平台结构化流介绍 有界和无界数据 有界数据: 指的数据有固定的开始和固定的结束#xff0c;数据大小是固定。我们称之为有界数据。对于有界数据#xff0c;一般采用批处理方案#xff08;离线计算#xff09;特点#xff1a;1-数据大小是固定2-程序处理有界数据#xff0c…结构化流介绍 有界和无界数据 有界数据: 指的数据有固定的开始和固定的结束数据大小是固定。我们称之为有界数据。对于有界数据一般采用批处理方案离线计算特点1-数据大小是固定2-程序处理有界数据程序最终一定会停止无界数据: 指的数据有固定的开始但是没有固定的结束。我们称之为无界数据 对于无界数据我们一般采用流式处理方案实时计算特点1-数据没有明确的结束也就是数据大小不固定2-数据是源源不断的过来3-程序处理无界数据程序会一直运行不会结束基本介绍 结构化流是构建在Spark SQL处理引擎之上的一个流式的处理引擎主要是针对无界数据的处理操作。对于结构化流同样也支持多种语言操作的API比如 Python Java Scala SQL … Spark的核心是RDD。RDD出现主要的目的就是提供更加高效的离线的迭代计算操作RDD是针对的有界的数据集但是为了能够兼容实时计算的处理场景提供微批处理模型本质上还是批处理只不过批与批之间的处理间隔时间变短了让我们感觉是在进行流式的计算操作目前默认的微批可以达到100毫秒一次 ​ 真正的流处理引擎: Flink、Storm(早期流式处理引擎)、Flume(流式数据采集) 实时数据案例–词频统计 需求: 代码实现: import os from pyspark.sql import SparkSession import pyspark.sql.functions as F# 绑定指定的Python解释器 os.environ[SPARK_HOME] /export/server/spark os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3 os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 1- 创建SparkSession对象spark SparkSession.builder\.config(spark.sql.shuffle.partitions,1)\.appName(structured_streaming_wordcount)\.master(local[*])\.getOrCreate()# 2- 数据输入init_df spark.readStream\.format(socket)\.option(host,192.168.88.161)\.option(port,55555)\.load()# 3- 数据处理result_df init_df.select(F.explode(F.split(value, )).alias(word)).groupBy(word).agg(F.count(word).alias(cnt))# init_df.show()# 4- 数据输出# 5- 启动流式任务result_df.writeStream.format(console).outputMode(complete).start().awaitTermination()程序运行结果: 代码测试操作步骤: 首先: 先下载一个 nc(netcat) 命令. 通过此命令打开一个端口号, 并且可以向这个端口写入数据 yum -y install nc执行nc命令, 开启端口号, 写入数据: nc -lk 55555注意: 要先启动nc再启动我们的程序查看端口号是否被使用命令 netstat -nlp | grep 要查询的端口可能遇到的错误 结构化流的编程模型 数据结构 在结构化流中我们可以将DataFrame称为无界的DataFrame或者无界的二维表 数据源部分 结构化流默认提供了多种数据源从而可以支持不同的数据源的处理工作。目前提供了如下数据源 Socket Source网络套接字数据源一般用于测试。也就是从网络上消费/读取数据File Source文件数据源。读取文件系统一般用于测试。如果文件夹下发生变化有新文件产生那么就会触发程序的运行Kafka SourceKafka数据源。也就是作为消费者来读取Kafka中的数据。一般用于生产环境。Rate Source速率数据源。一般用于测试。通过配置参数由结构化流自动生成测试数据。## Operation操作 对应官网文档内容https://spark.apache.org/docs/3.1.2/structured-streaming-programming-guide.html#input-sources File Source 将目录中写入的文件作为数据流读取支持的文件格式为text、csv、json、orc、parquet… 相关的参数: option参数描述说明maxFilesPerTrigger每次触发时要考虑的最大新文件数 (默认: no max)latestFirst是否先处理最新的新文件, 当有大量文件积压时有用 (默认: false)fileNameOnly是否检查新文件只有文件名而不是完整路径默认值false将此设置为 true 时以下文件将被视为同一个文件因为它们的文件名“dataset.txt”相同 “file:///dataset.txt” “s3://a/dataset.txt “s3n://a/b/dataset.txt” “s3a://a/b/c/dataset.txt” 读取代码通用格式: sparksession.readStream.format(CSV|JSON|Text|Parquet|ORC...).option(参数名1,参数值1).option(参数名2,参数值2).option(参数名N,参数值N).schema(元数据信息).load(需要监听的目录地址)针对具体数据格式还有对应的简写API格式例如sparksession.readStream.csv(path需要监听的目录地址,schema元数据信息。。。)代码操作 import os from pyspark.sql import SparkSession# 绑定指定的Python解释器 os.environ[SPARK_HOME] /export/server/spark os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3 os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 1- 创建SparkSession对象spark SparkSession.builder\.config(spark.sql.shuffle.partitions,1)\.appName(file_source)\.master(local[*])\.getOrCreate()# 2- 数据输入File Source文件数据源File Source总结1- 只能监听目录不能监听具体的文件2- 可以通过*通配符的形式监听目录中满足条件的文件3- 如果监听目录中有子目录那么无法监听到子目录的变化情况init_df spark.readStream.csv(pathfile:///export/data/,sep,,encodingUTF-8,schemaid int,name string)# 3- 数据处理# 4- 数据输出# 5- 启动流式任务init_df.writeStream.format(console).outputMode(append).start().awaitTermination()可能遇到的错误一 原因: 如果是文件数据源需要手动指定schema信息可能遇到的错误二 原因: File source只能监听目录不能监听具体文件文件数据源特点 1- 不能够监听具体的文件否则会报错误java.lang.IllegalArgumentException: Option basePath must be a directory 2- 可以通过通配符的形式来监听目录下的文件符合要求的才会被读取 3- 如果监听目录中有子目录那么无法监听到子目录的变化情况Operations操作 指的是数据处理部分该操作和Spark SQL中是完全一致。可以使用SQL方式进行处理也可以使用DSL方式进行处理。 Sink输出操作 在结构化流中定义好DataFrame或者处理好DataFrame之后调用writeStream()方法完成数据的输出操作。在输出的过程中我们可以设置一些相关的属性然后启动结构化流程序运行。 输出模式 在进行数据输出的时候必须通过outputMode来设置输出模式。输出模式提供了3种不同的模式 1- append模式增量模式 特点当结构化程序处理数据的时候如果有了新数据才会触发执行。而且该模式只支持追加。不支持数据处理阶段有聚合的操作。如果有了聚合操作直接报错。而且也不支持排序操作。如果有了排序直接报错。 2- complete模式完全全量模式 特点当结构化程序处理数据的时候每一次都是针对全量的数据进行处理。由于数据越来越多所以在数据处理阶段必须要有聚合操作。如果没有聚合操作直接报错。另外还支持排序但是不是强制要求。 3- update模式更新模式 特点支持聚合操作。当结构化程序处理数据的时候如果处理阶段没有聚合操作该模式效果和append模式是一致。如果有了聚合操作只会输出有变化和新增的内容。但是不支持排序操作如果有了排序直接报错。 append模式: import os from pyspark.sql import SparkSession import pyspark.sql.functions as F# 绑定指定的Python解释器 os.environ[SPARK_HOME] /export/server/spark os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3 os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 1- 创建SparkSession对象spark SparkSession.builder\.config(spark.sql.shuffle.partitions,1)\.appName(structured_streaming_wordcount)\.master(local[*])\.getOrCreate()# 2- 数据输入init_df spark.readStream\.format(socket)\.option(host,192.168.88.161)\.option(port,55555)\.load()init_df.createTempView(tmp_table)# 3- 数据处理# 正常没有聚合操作也没有排序result_df spark.sql(selectexplode(split(value, )) as wordfrom tmp_table)# 异常有聚合操作没有排序# result_df spark.sql(# select# word,count(1) as cnt# from (# select# explode(split(value, )) as word# from tmp_table# )# group by word# )# 异常没有聚合操作有排序# result_df spark.sql(# select# word# from (# select# explode(split(value, )) as word# from tmp_table# )# order by word# )# 4- 数据输出# 5- 启动流式任务result_df.writeStream.format(console).outputMode(append).start().awaitTermination()如果有了聚合操作会报如下错误 如果有了排序操作会报如下错误 complete模式: import os from pyspark.sql import SparkSession import pyspark.sql.functions as F# 绑定指定的Python解释器 os.environ[SPARK_HOME] /export/server/spark os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3 os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 1- 创建SparkSession对象spark SparkSession.builder\.config(spark.sql.shuffle.partitions,1)\.appName(structured_streaming_wordcount)\.master(local[*])\.getOrCreate()# 2- 数据输入init_df spark.readStream\.format(socket)\.option(host,192.168.88.161)\.option(port,55555)\.load()init_df.createTempView(tmp_table)# 3- 数据处理# 异常没有聚合操作# result_df spark.sql(# select# explode(split(value, )) as word# from tmp_table# )# 正常有聚合操作没有排序result_df spark.sql(selectword,count(1) as cntfrom (selectexplode(split(value, )) as wordfrom tmp_table)group by wordorder by cnt)# 4- 数据输出# 5- 启动流式任务result_df.writeStream.format(console).outputMode(complete).start().awaitTermination()如果没有聚合操作会报如下错误 update模式: import os from pyspark.sql import SparkSession import pyspark.sql.functions as F# 绑定指定的Python解释器 os.environ[SPARK_HOME] /export/server/spark os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3 os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 1- 创建SparkSession对象spark SparkSession.builder\.config(spark.sql.shuffle.partitions,1)\.appName(structured_streaming_wordcount)\.master(local[*])\.getOrCreate()# 2- 数据输入init_df spark.readStream\.format(socket)\.option(host,192.168.88.161)\.option(port,55555)\.load()init_df.createTempView(tmp_table)# 3- 数据处理# 正常没有聚合操作result_df spark.sql(selectexplode(split(value, )) as wordfrom tmp_table)# 正常有聚合操作没有排序# result_df spark.sql(# select# word,count(1) as cnt# from (# select# explode(split(value, )) as word# from tmp_table# )# group by word# )# 异常有排序result_df spark.sql(selectwordfrom (selectexplode(split(value, )) as wordfrom tmp_table)order by word)# 4- 数据输出# 5- 启动流式任务result_df.writeStream.format(console).outputMode(update).start().awaitTermination()如果有了排序操作会报如下错误
http://www.zqtcl.cn/news/239890/

相关文章:

  • 以下属于网站页面设计的原则有2345网址下载
  • 为网站的特色功能设计各种模板温州网页模板建站
  • 广州微网站建设企业网站建设网站优化推广
  • 大连模板网建站项目网络的关键路径
  • 迅雷黄冈网站推广软件徐州模板自助建站
  • 怎么做不占CPU的网站修改wordpress的登陆地址
  • 网站制作毕业设计论文软件ui设计培训机构
  • 物业网站模板哪里建设网站
  • 达州城乡建设网站手机网站 方案
  • 平台兼职网站开发许昌做网站优化
  • 婴幼儿用品网站开发意义基因网站开发
  • 自己网站页面设计软件传奇世界游戏官网
  • 淘宝网网站开发部技术部三亚私人高清影院品牌加盟
  • 网站是用什么软件做的山西网络科技有限公司
  • 汕头网站建设开发做购物网站 营业范围是什么
  • 网站建设 企业短视频运营计划书
  • 网站仿静态网站城市分站织梦系统
  • 淄博网站建设高端企业最新商业资讯
  • 百度推广太原网站建设wordpress的页面和首页一样
  • 无为网站定制php网站 mysql数据库配置文件
  • 如何利用div做网站wordpress替换百度站内搜索
  • 大德通网站建设互动营销网站
  • 网站建设与管理实训主要内容响应式网站建设智能优化
  • 佛山市企业网站建设报价网站建
  • 广州网站营销推广设计孝义网站开发
  • 新站网站如何做Seo那个网站点击率高
  • 个体做外贸的网站罗浮视窗网站建设
  • 产品企业网站上海关键词排名优化公司
  • 网站APP推广东莞人才招聘网58
  • 惠州网站建设哪家好建筑网站建设方案