当前位置: 首页 > news >正文

云南省工程建设交易系统网站设计一个个人网站

云南省工程建设交易系统网站,设计一个个人网站,郑州网络推广团队,wordpress 彻底加速大纲 sourceMapSplittingMapping ReduceKeyingReducing 完整代码结构参考资料 在《0基础学习PyFlink——模拟Hadoop流程》一文中#xff0c;我们看到Hadoop在处理大数据时的MapReduce过程。 本节介绍的DataStream API#xff0c;则使用了类似的结构。 source 为了方便我们看到Hadoop在处理大数据时的MapReduce过程。 本节介绍的DataStream API则使用了类似的结构。 source 为了方便我们依然使用from_collection从内存中读取数据。 和使用Table API类似我们给from_collection传递的第二参数是每行数据类型。本例中是String即“A C B”的类型。 from pyflink.common import Types from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionModeword_count_data [A C B,A E B,E C D]def word_count():env StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.BATCH)# write all the data to one fileenv.set_parallelism(1)source_type_info Types.STRING()# define the sourcesource env.from_collection(word_count_data, source_type_info)可以使用下面指令输出source内容 source.print()A C B A E B E C DMap 和上图一样Map由Splitting和Mapping组成。它们分别将数据切割成做小运算单元和生成map结构。 Splitting def split(line):for s in line.split():yield ssplitted source.flat_map(split) 上述splitted的结构输出是 A C B A E B E C DMapping Mapping的操作就是将之前的数组结构转换成map结构 mappedsplitted.map(lambda i: (i, 1), Types.TUPLE([Types.STRING(), Types.INT()]))mapped的输出值如下可以看到它还是按我们输入数据的顺序排列的。 (A,1) (C,1) (B,1) (A,1) (E,1) (B,1) (E,1) (C,1) (D,1)Reduce Keying 这一步对应于上图中的ShufflingSorting它会将相同key的数据进行分区以供后面reducing操作使用。 keyedmapped.key_by(lambda i: i[0]) 可以看到keyed数据已经经过排序和聚合了。 (A,1) (A,1) (B,1) (B,1) (C,1) (C,1) (D,1)Reducing reducedkeyed.reduce(lambda i, j: (i[0], i[1] j[1]))reduce的方法有如下注释 Applies a reduce transformation on the grouped data stream grouped on by the given key position. The ReduceFunction will receive input values based on the key value. Only input values with the same key will go to the same reducer. 特别是最后一句非常有用“Only input values with the same key will go to the same reducer”只有相同Key的输入数据才会进入相同的Reducer中。这句话意味着上述Keyed的数据会被分组执行于是就不会出现计算错乱。 (A,2) (B,2) (C,2) (D,1) (E,2)完整代码 from pyflink.common import Types from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionModeword_count_data [A C B,A E B,E C D]def word_count():env StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.BATCH)# write all the data to one fileenv.set_parallelism(1)source_type_info Types.STRING()# define the sourcesource env.from_collection(word_count_data, source_type_info)# source.print()def split(line):for s in line.split():yield ssplitted source.flat_map(split) # splitted.print()mappedsplitted.map(lambda i: (i, 1), Types.TUPLE([Types.STRING(), Types.INT()]))# mapped.print()keyedmapped.key_by(lambda i: i[0]) # keyed.print()reducedkeyed.reduce(lambda i, j: (i[0], i[1] j[1]))# define the sinkreduced.print()# submit for executionenv.execute()if __name__ __main__:word_count()结构 参考资料 https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/python/datastream_tutorial/
http://www.zqtcl.cn/news/578563/

相关文章:

  • 互联网站平台有哪些建筑工程教育网官网
  • 广告传媒公司哪家好职场seo是什么意思
  • 番禺龙美村做网站博山区住房和城乡建设局网站
  • 山东网站建设xywlcnwordpress如何创建导航
  • 直接用ip访问网站网站开发常用字体
  • 江西省城乡建设培训网 官方网站杭州十大软件公司
  • 建设网站需要什么设备南昌购物网站制作
  • 做家具的网站工作单位怎么填
  • 福州建设银行官网招聘网站山西建设公司网站
  • 集团网站建设方案中卫网站推广制作
  • 射阳网站建设电商运营团队结构图
  • 有没有女的做任务的网站计算机网站开发专业
  • 怎么样开始做网站网站建设 营业执照 经营范围
  • 威海做网站网站建设方案书 模版
  • 泗阳做网站南昌建设
  • 做企业网站用什么软件深圳制作企业网站
  • 大连微信网站开发兰州网站建设模板
  • 建设项目安监备案网站外贸 网站 seo
  • 企慕网站建设网络推广合肥市网站制作
  • 做空比特币网站大气简约企业网站模板免费下载
  • 坪山网站建设行业现状做网站能月入10万
  • 个人网站有什么内容广西网站建设推广
  • 安徽教育云网站建设网站seo诊断的主要内容
  • 网站建设例子开发工具宏怎么使用
  • 新乡做网站公司哪个地区网站建设好
  • 网站模板怎么编辑网站定制化
  • 利于优化的网站网络科技公司怎么赚钱
  • 制作网站的步骤和方法做物流的网站有哪些功能
  • vs做网站图片明明在文件夹里却找不到中国建筑网官网找客户信息
  • WordPress仿站培训黑龙江新闻夜航