当前位置: 首页 > news >正文

小榄公司网站建设嘉兴网站排名优化报价

小榄公司网站建设,嘉兴网站排名优化报价,做淘宝客网站需要工商营业执照,怎么把html文件生成网址结构化流介绍 有界和无界数据 有界数据: 指的数据有固定的开始和固定的结束#xff0c;数据大小是固定。我们称之为有界数据。对于有界数据#xff0c;一般采用批处理方案#xff08;离线计算#xff09;特点#xff1a;1-数据大小是固定2-程序处理有界数据#xff0c…结构化流介绍 有界和无界数据 有界数据: 指的数据有固定的开始和固定的结束数据大小是固定。我们称之为有界数据。对于有界数据一般采用批处理方案离线计算特点1-数据大小是固定2-程序处理有界数据程序最终一定会停止无界数据: 指的数据有固定的开始但是没有固定的结束。我们称之为无界数据 对于无界数据我们一般采用流式处理方案实时计算特点1-数据没有明确的结束也就是数据大小不固定2-数据是源源不断的过来3-程序处理无界数据程序会一直运行不会结束基本介绍 结构化流是构建在Spark SQL处理引擎之上的一个流式的处理引擎主要是针对无界数据的处理操作。对于结构化流同样也支持多种语言操作的API比如 Python Java Scala SQL … Spark的核心是RDD。RDD出现主要的目的就是提供更加高效的离线的迭代计算操作RDD是针对的有界的数据集但是为了能够兼容实时计算的处理场景提供微批处理模型本质上还是批处理只不过批与批之间的处理间隔时间变短了让我们感觉是在进行流式的计算操作目前默认的微批可以达到100毫秒一次 ​ 真正的流处理引擎: Flink、Storm(早期流式处理引擎)、Flume(流式数据采集) 实时数据案例–词频统计 需求: 代码实现: import os from pyspark.sql import SparkSession import pyspark.sql.functions as F# 绑定指定的Python解释器 os.environ[SPARK_HOME] /export/server/spark os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3 os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 1- 创建SparkSession对象spark SparkSession.builder\.config(spark.sql.shuffle.partitions,1)\.appName(structured_streaming_wordcount)\.master(local[*])\.getOrCreate()# 2- 数据输入init_df spark.readStream\.format(socket)\.option(host,192.168.88.161)\.option(port,55555)\.load()# 3- 数据处理result_df init_df.select(F.explode(F.split(value, )).alias(word)).groupBy(word).agg(F.count(word).alias(cnt))# init_df.show()# 4- 数据输出# 5- 启动流式任务result_df.writeStream.format(console).outputMode(complete).start().awaitTermination()程序运行结果: 代码测试操作步骤: 首先: 先下载一个 nc(netcat) 命令. 通过此命令打开一个端口号, 并且可以向这个端口写入数据 yum -y install nc执行nc命令, 开启端口号, 写入数据: nc -lk 55555注意: 要先启动nc再启动我们的程序查看端口号是否被使用命令 netstat -nlp | grep 要查询的端口可能遇到的错误 结构化流的编程模型 数据结构 在结构化流中我们可以将DataFrame称为无界的DataFrame或者无界的二维表 数据源部分 结构化流默认提供了多种数据源从而可以支持不同的数据源的处理工作。目前提供了如下数据源 Socket Source网络套接字数据源一般用于测试。也就是从网络上消费/读取数据File Source文件数据源。读取文件系统一般用于测试。如果文件夹下发生变化有新文件产生那么就会触发程序的运行Kafka SourceKafka数据源。也就是作为消费者来读取Kafka中的数据。一般用于生产环境。Rate Source速率数据源。一般用于测试。通过配置参数由结构化流自动生成测试数据。## Operation操作 对应官网文档内容https://spark.apache.org/docs/3.1.2/structured-streaming-programming-guide.html#input-sources File Source 将目录中写入的文件作为数据流读取支持的文件格式为text、csv、json、orc、parquet… 相关的参数: option参数描述说明maxFilesPerTrigger每次触发时要考虑的最大新文件数 (默认: no max)latestFirst是否先处理最新的新文件, 当有大量文件积压时有用 (默认: false)fileNameOnly是否检查新文件只有文件名而不是完整路径默认值false将此设置为 true 时以下文件将被视为同一个文件因为它们的文件名“dataset.txt”相同 “file:///dataset.txt” “s3://a/dataset.txt “s3n://a/b/dataset.txt” “s3a://a/b/c/dataset.txt” 读取代码通用格式: sparksession.readStream.format(CSV|JSON|Text|Parquet|ORC...).option(参数名1,参数值1).option(参数名2,参数值2).option(参数名N,参数值N).schema(元数据信息).load(需要监听的目录地址)针对具体数据格式还有对应的简写API格式例如sparksession.readStream.csv(path需要监听的目录地址,schema元数据信息。。。)代码操作 import os from pyspark.sql import SparkSession# 绑定指定的Python解释器 os.environ[SPARK_HOME] /export/server/spark os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3 os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 1- 创建SparkSession对象spark SparkSession.builder\.config(spark.sql.shuffle.partitions,1)\.appName(file_source)\.master(local[*])\.getOrCreate()# 2- 数据输入File Source文件数据源File Source总结1- 只能监听目录不能监听具体的文件2- 可以通过*通配符的形式监听目录中满足条件的文件3- 如果监听目录中有子目录那么无法监听到子目录的变化情况init_df spark.readStream.csv(pathfile:///export/data/,sep,,encodingUTF-8,schemaid int,name string)# 3- 数据处理# 4- 数据输出# 5- 启动流式任务init_df.writeStream.format(console).outputMode(append).start().awaitTermination()可能遇到的错误一 原因: 如果是文件数据源需要手动指定schema信息可能遇到的错误二 原因: File source只能监听目录不能监听具体文件文件数据源特点 1- 不能够监听具体的文件否则会报错误java.lang.IllegalArgumentException: Option basePath must be a directory 2- 可以通过通配符的形式来监听目录下的文件符合要求的才会被读取 3- 如果监听目录中有子目录那么无法监听到子目录的变化情况Operations操作 指的是数据处理部分该操作和Spark SQL中是完全一致。可以使用SQL方式进行处理也可以使用DSL方式进行处理。 Sink输出操作 在结构化流中定义好DataFrame或者处理好DataFrame之后调用writeStream()方法完成数据的输出操作。在输出的过程中我们可以设置一些相关的属性然后启动结构化流程序运行。 输出模式 在进行数据输出的时候必须通过outputMode来设置输出模式。输出模式提供了3种不同的模式 1- append模式增量模式 特点当结构化程序处理数据的时候如果有了新数据才会触发执行。而且该模式只支持追加。不支持数据处理阶段有聚合的操作。如果有了聚合操作直接报错。而且也不支持排序操作。如果有了排序直接报错。 2- complete模式完全全量模式 特点当结构化程序处理数据的时候每一次都是针对全量的数据进行处理。由于数据越来越多所以在数据处理阶段必须要有聚合操作。如果没有聚合操作直接报错。另外还支持排序但是不是强制要求。 3- update模式更新模式 特点支持聚合操作。当结构化程序处理数据的时候如果处理阶段没有聚合操作该模式效果和append模式是一致。如果有了聚合操作只会输出有变化和新增的内容。但是不支持排序操作如果有了排序直接报错。 append模式: import os from pyspark.sql import SparkSession import pyspark.sql.functions as F# 绑定指定的Python解释器 os.environ[SPARK_HOME] /export/server/spark os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3 os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 1- 创建SparkSession对象spark SparkSession.builder\.config(spark.sql.shuffle.partitions,1)\.appName(structured_streaming_wordcount)\.master(local[*])\.getOrCreate()# 2- 数据输入init_df spark.readStream\.format(socket)\.option(host,192.168.88.161)\.option(port,55555)\.load()init_df.createTempView(tmp_table)# 3- 数据处理# 正常没有聚合操作也没有排序result_df spark.sql(selectexplode(split(value, )) as wordfrom tmp_table)# 异常有聚合操作没有排序# result_df spark.sql(# select# word,count(1) as cnt# from (# select# explode(split(value, )) as word# from tmp_table# )# group by word# )# 异常没有聚合操作有排序# result_df spark.sql(# select# word# from (# select# explode(split(value, )) as word# from tmp_table# )# order by word# )# 4- 数据输出# 5- 启动流式任务result_df.writeStream.format(console).outputMode(append).start().awaitTermination()如果有了聚合操作会报如下错误 如果有了排序操作会报如下错误 complete模式: import os from pyspark.sql import SparkSession import pyspark.sql.functions as F# 绑定指定的Python解释器 os.environ[SPARK_HOME] /export/server/spark os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3 os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 1- 创建SparkSession对象spark SparkSession.builder\.config(spark.sql.shuffle.partitions,1)\.appName(structured_streaming_wordcount)\.master(local[*])\.getOrCreate()# 2- 数据输入init_df spark.readStream\.format(socket)\.option(host,192.168.88.161)\.option(port,55555)\.load()init_df.createTempView(tmp_table)# 3- 数据处理# 异常没有聚合操作# result_df spark.sql(# select# explode(split(value, )) as word# from tmp_table# )# 正常有聚合操作没有排序result_df spark.sql(selectword,count(1) as cntfrom (selectexplode(split(value, )) as wordfrom tmp_table)group by wordorder by cnt)# 4- 数据输出# 5- 启动流式任务result_df.writeStream.format(console).outputMode(complete).start().awaitTermination()如果没有聚合操作会报如下错误 update模式: import os from pyspark.sql import SparkSession import pyspark.sql.functions as F# 绑定指定的Python解释器 os.environ[SPARK_HOME] /export/server/spark os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3 os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 1- 创建SparkSession对象spark SparkSession.builder\.config(spark.sql.shuffle.partitions,1)\.appName(structured_streaming_wordcount)\.master(local[*])\.getOrCreate()# 2- 数据输入init_df spark.readStream\.format(socket)\.option(host,192.168.88.161)\.option(port,55555)\.load()init_df.createTempView(tmp_table)# 3- 数据处理# 正常没有聚合操作result_df spark.sql(selectexplode(split(value, )) as wordfrom tmp_table)# 正常有聚合操作没有排序# result_df spark.sql(# select# word,count(1) as cnt# from (# select# explode(split(value, )) as word# from tmp_table# )# group by word# )# 异常有排序result_df spark.sql(selectwordfrom (selectexplode(split(value, )) as wordfrom tmp_table)order by word)# 4- 数据输出# 5- 启动流式任务result_df.writeStream.format(console).outputMode(update).start().awaitTermination()如果有了排序操作会报如下错误
http://www.zqtcl.cn/news/517427/

相关文章:

  • 西安app网站开发如何制作一个自己的网页
  • 陇西学做网站鄂州网约车
  • 做类似58类型网站免费源码分享
  • 个人做的网站有什么危险网站模板怎样发布
  • 设计建设网站公司网站wordpress k2
  • 公司网站被抄袭网络宣传
  • 企业网站设计收费专业网络推广公司排名
  • 视频网站模板源码深圳网站建设明细报价表
  • nike官方网站定制二级域名网站有哪些
  • 越秀移动网站建设房门户网站如何做优化
  • 什么软件可以做动漫视频网站开发一个小程序大概要多少钱
  • 微网站可以做成域名访问株洲网站做的好的公司
  • 建设网站去工信部备案需要什么资料网站建设相关博客
  • 十度网站建设网站建立的企业
  • 婚庆公司网站国外网站阻止国内访问怎么做
  • 乐山高端网站建设wordpress openload
  • 哪些网站上可以做租车深圳品牌网站开发
  • 乐清网站改版公司西安网站建设公司哪家好
  • 国外小型网站1688货源网下载
  • 浏览量最大的网站网站导航栏目设计内容依据
  • 户外拓展公司网站开发桂林网站开发
  • 怎么入侵网站后台互联网营销师含金量
  • 网站建设ningqueseo济南网站建设服务
  • 做网站给女朋友品牌网站建设只询大蝌蚪
  • 厦门服装商城网站建设米课做网站
  • ui做网站实例一起做网店网站官方
  • 网站建设合同怎么写wordpress如何设置404页面
  • wordpress 安装过程顺德网站优化
  • 大麦网网站建设的功能定位wordpress图片不被收录
  • 做推广任务的网站渠道营销推广方案