当前位置: 首页 > news >正文

石家庄网站建设就找企行家怎样做有效的黄页网站

石家庄网站建设就找企行家,怎样做有效的黄页网站,杭州网站的建设,网站没有流量怎么办一、双流JOIN 在Flink中, 支持两种方式的流的Join: Window Join和Interval Join 二、Window Join 窗口join会join具有相同的key并且处于同一个窗口中的两个流的元素. 注意: 1.所有的窗口join都是 inner join, 意味着a流中的元素如果在b流中没有对应的, 则a流中这个元素就不会…一、双流JOIN 在Flink中, 支持两种方式的流的Join: Window Join和Interval Join 二、Window Join 窗口join会join具有相同的key并且处于同一个窗口中的两个流的元素. 注意: 1.所有的窗口join都是 inner join, 意味着a流中的元素如果在b流中没有对应的, 则a流中这个元素就不会处理(就是忽略掉了) 2.join成功后的元素的会以所在窗口的最大时间作为其时间戳. 例如窗口[5,10), 则元素会以9作为自己的时间戳。 Window join 仍然可分为 滚动窗口、滑动窗口Join、会话窗口Join 滚动窗口Join代码段示例 package com.lyh.flink12; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time;/*** Author lizhenchaoatguigu.cn* Date 2021/1/24 22:09*/ public class Flink01_Join_Window_Tumbling {public static void main(String[] args) {StreamExecutionEnvironment env StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(1);SingleOutputStreamOperatorWaterSensor s1 env.socketTextStream(hadoop100, 8888) // 在socket终端只输入毫秒级别的时间戳.map(value - {String[] datas value.split(,);return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerWaterSensor() {Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs() * 1000;}}));SingleOutputStreamOperatorWaterSensor s2 env.socketTextStream(hadoop100, 9999) // 在socket终端只输入毫秒级别的时间戳.map(value - {String[] datas value.split(,);return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerWaterSensor() {Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs() * 1000;}}));s1.join(s2).where(WaterSensor::getId).equalTo(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(5))) // 必须使用窗口.apply(new JoinFunctionWaterSensor, WaterSensor, String() {Overridepublic String join(WaterSensor first, WaterSensor second) throws Exception {return first: first , second: second;}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}} }运行结果 三、Interval Join 间隔流join(Interval Join), 是指使用一个流的数据按照key去join另外一条流的指定范围的数据. 如下图: 橙色的流去join绿色的流.范围是由橙色流的event-time lower bound和event-time upper bound来决定的. orangeElem.ts lowerBound greenElem.ts orangeElem.ts upperBound Interval Join只支持event-time 必须是keyBy之后的流才可以interval join package com.lyh.flink12; import com.lyh.bean.WaterSensor; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.table.planner.expressions.In; import org.apache.flink.util.Collector; import java.time.Duration;public class Sql_Join_Windows_Interval{public static void main(String[] args) {StreamExecutionEnvironment env StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(2);SingleOutputStreamOperatorWaterSensor s1 env.socketTextStream(hadoop100, 8888).map(value - {String[] data value.split(,);return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssignerWaterSensor() {Overridepublic long extractTimestamp(WaterSensor element, long timestamp) {return element.getTs();}}));SingleOutputStreamOperatorWaterSensor s2 env.socketTextStream(hadoop100, 9999).map(value - {String[] data value.split(,);return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssignerWaterSensor() {Overridepublic long extractTimestamp(WaterSensor element, long timestamp) {return element.getTs();}}));s1.keyBy(WaterSensor::getId).intervalJoin(s2.keyBy(WaterSensor::getId)).between(Time.seconds(-2),Time.seconds(3)).process(new ProcessJoinFunctionWaterSensor, WaterSensor, String() {Overridepublic void processElement(WaterSensor left,WaterSensor right,Context ctx,CollectorString out) throws Exception {out.collect(left , right);}}).print();try{env.execute();} catch (Exception e){e.printStackTrace();}}}运行结果
http://www.zqtcl.cn/news/355885/

相关文章:

  • 做网站代理国内课程网站建设现状
  • 中国建设银行手机网站下载从零开始建设企业网站
  • 网站友情链接怎么弄seo平台
  • 建设网站一定要备案吗嘉兴做网站设计
  • 如何制作营销网站模板做外贸需要关注的网站有什么好处
  • 东莞勒流网站制作wordpress 自定义字段 查询
  • 温州网站开发风格做影视剧组演员垂直平台网站
  • c 网站开发培训怎么做网站的站点地图
  • html 网站模板简单网站制作北京海淀
  • 大庆做网站找谁珠海网站搭建
  • 网站建设方面的外文宿迁房产网找房
  • 运营 网站遵义网站开发制作公司
  • 动力论坛源码网站后台地址是什么网站上微信支付功能
  • 网站需求分析模板深圳3d制作
  • 宿迁网站建设推广公司wordpress忘记密码了
  • 成都双语网站开发flat wordpress
  • 大连做公司网站的公司网络营销的网站
  • 做网站 人工智能怎么做商业服务网站
  • 自助建站公司四平市住房和城乡建设部网站
  • 淄博网站seo价格世界新闻最新消息
  • 网站开发 毕业答辩pptwordpress qq邮箱订阅
  • 国家icp备案网站群辉域名登录wordpress
  • 仙居住房和城乡建设规划局网站可以做思维导图的网站
  • 企业网站建设费怎么入账石家庄定制网站建设服务
  • 遂宁建设网站如何搭建微信公众号平台
  • 咖啡网站源码公司网站手机版
  • 新能源网站开发网站做5级分销合法吗
  • 西安建设网站排名简约风网站首页怎么做
  • 安远做网站做服务网站要多少钱
  • 功能网站模板电商平台项目商业计划书