当前位置: 首页 > news >正文

如何选择网站模板物联网设计竞赛

如何选择网站模板,物联网设计竞赛,博远手机销售管理系统app,嵌入式软件开发和硬件开发flink的定时器都是基于事件时间#xff08;event time#xff09;或事件处理时间#xff08;processing time#xff09;的变化来触发响应的。对一部分新手玩家来说#xff0c;可能不清楚事件时间和事件处理时间的区别。我这里先说一下我的理解#xff0c;防止下面懵逼。…flink的定时器都是基于事件时间event time或事件处理时间processing time的变化来触发响应的。对一部分新手玩家来说可能不清楚事件时间和事件处理时间的区别。我这里先说一下我的理解防止下面懵逼。简单来说事件时间就相当于人出生的时间一般数据生成的时候也会有创建时间。而事件处理时间则相当于人具体做某件事的时间一条数据可能是2023年生成的但是到2024年才被处理这个2024年便被称为这个事件的处理时间。 一、事件时间定时器event time,这是基于事件时间来触发的这里面有一个小坑。当第一个事件到的时候创建一个定时器10秒后触发。对我们大部分人来说我既然已经创建了这个定时器那么10秒后他就会自动触发。但事实上他10秒后如果没有事件到来他并不会触发。大概意思就是前一个事件创建的定时器需要后一个事件的时间来触发。下面是事件时间定时器的一种实现方式。 import com.xcj.flink.bean.Temperature; import com.xcj.util.DateFormat; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.util.Collector;import java.time.Duration; import java.util.Date; import java.util.Iterator; import java.util.Scanner;public class EventTime {public static void main(String[] args) throws Exception {SourceTemperature mySourceTemperature new SourceTemperature();StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceTemperature tDSSource env.addSource(mySourceTemperature);WatermarkStrategyTemperature twsDS WatermarkStrategy.TemperatureforBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner((temperature, recordTimestamp) - temperature.getTimestamp());SingleOutputStreamOperatorTemperature tSSODS tDSSource.assignTimestampsAndWatermarks(twsDS);KeyedStreamTemperature, String keyByDS tSSODS.keyBy(temperature - temperature.getDay());SingleOutputStreamOperatorTemperature process keyByDS.process(new ProcessFunctionTemperature, Temperature() {ListStateTemperature temperatureListState;ValueStateTemperature temperatureState;ValueStateInteger size;ValueStateLong temperature;Overridepublic void open(OpenContext openContext) throws Exception {ListStateDescriptorTemperature listStateDescriptor new ListStateDescriptor(listState, Temperature.class);temperatureListState getRuntimeContext().getListState(listStateDescriptor);temperatureState getRuntimeContext().getState(new ValueStateDescriptor(temperatureState, Temperature.class));size getRuntimeContext().getState(new ValueStateDescriptor(sizeState, Integer.class));temperature getRuntimeContext().getState(new ValueStateDescriptor(temperature, Long.class));}Overridepublic void processElement(Temperature value, ProcessFunctionTemperature, Temperature.Context ctx, CollectorTemperature out) throws Exception {Temperature value1 temperatureState.value();//System.out.println(ctx.timestamp());if(value1 null){temperatureState.update(value);temperatureListState.add(value);size.update(1);//System.out.printf(当前事件处理DateFormat.getDateTime(ctx.timestamp()));//System.out.println(当前水位线DateFormat.getDateTime(ctx.timerService().currentWatermark()));temperature.update(value.getTimestamp());ctx.timerService().registerEventTimeTimer(temperature.value()1000*10);}else{if(value1.getTemperature() value.getTemperature()){temperatureState.update(value);temperatureListState.add(value);size.update(size.value()1);//System.out.println(size.value());if(size.value() 3){System.out.printf(警告警告);IteratorTemperature iterator temperatureListState.get().iterator();while(iterator.hasNext()){out.collect(iterator.next());}temperatureListState.clear();temperatureState.clear();size.clear();ctx.timerService().deleteEventTimeTimer(temperature.value()1000*10);}}else{System.out.println(温度降低了);temperatureState.update(value);temperatureListState.clear();temperatureListState.add(value);size.update(1);ctx.timerService().deleteEventTimeTimer(temperature.value()1000*10);temperature.update(value.getTimestamp());ctx.timerService().registerEventTimeTimer(temperature.value()1000*10);}}}Overridepublic void onTimer(long timestamp, ProcessFunctionTemperature, Temperature.OnTimerContext ctx, CollectorTemperature out) throws Exception {System.out.printf(时间到了清空温度DateFormat.getDateTime(ctx.timestamp()));temperatureListState.clear();temperatureState.clear();size.clear();if(temperature.value() ! null)ctx.timerService().deleteEventTimeTimer(temperature.value() 10*1000);}});process.print(当前警告温度为);env.execute();} }//自己定义数据源class SourceTemperature extends RichSourceFunctionTemperature {Overridepublic void run(SourceContextTemperature ctx) throws Exception {Scanner scanner new Scanner(System.in);while (true) {Temperature temperature new Temperature();System.out.print(请输入温度 );//double temp Math.random()*40;double temp scanner.nextDouble();//System.out.println(temp);temperature.setTemperature(temp);temperature.setTimestamp(new Date().getTime());ctx.collect(temperature);//Thread.sleep(1000);}}Overridepublic void cancel() {} }//自定义实体类 class Temperature1 {public Temperature1(double temperature, long timestamp) {this.temperature temperature;this.timestamp timestamp;}public Temperature1(){};//温度private double temperature;//时间private long timestamp;//idprivate String day 2024-12-24;public double getTemperature() {return temperature;}public void setTemperature(double temperature) {this.temperature temperature;}public long getTimestamp() {return timestamp;}public void setTimestamp(long timestamp) {this.timestamp timestamp;}public String getDay() {return day;}public void setDay(String day) {this.day day;}Overridepublic String toString() {return Temperature1{ temperature temperature , timestamp timestamp , day day \ };} } 下面我们做一个测试来验证一下这个解释前一个事件创建的定时器需要后一个事件的时间来触发。他们的时间间隔超过了10秒钟但是时间并没有触发而是下一个事件到的时候才触发的。 二、事件处理时间事件处理时间触发有系统时间有关 package com.xcj; import com.xcj.flink.bean.Temperature; import com.xcj.util.DateFormat; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.util.Collector;import java.time.Duration; import java.util.Date; import java.util.Iterator; import java.util.Scanner;public class ProcessTime {public static void main(String[] args) throws Exception {SourceTemperature mySourceTemperature new SourceTemperature();StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceTemperature tDSSource env.addSource(mySourceTemperature); // WatermarkStrategyTemperature twsDS // WatermarkStrategy.TemperatureforBoundedOutOfOrderness(Duration.ofSeconds(0)) // .withTimestampAssigner((temperature, recordTimestamp) - temperature.getTimestamp()); // // SingleOutputStreamOperatorTemperature tSSODS tDSSource.assignTimestampsAndWatermarks(twsDS);KeyedStreamTemperature, String keyByDS tDSSource.keyBy(temperature - temperature.getDay());SingleOutputStreamOperatorTemperature process keyByDS.process(new ProcessFunctionTemperature, Temperature() {ListStateTemperature temperatureListState;ValueStateTemperature temperatureState;ValueStateInteger size;ValueStateLong temperature;Overridepublic void open(OpenContext openContext) throws Exception {ListStateDescriptorTemperature listStateDescriptor new ListStateDescriptor(listState, Temperature.class);temperatureListState getRuntimeContext().getListState(listStateDescriptor);temperatureState getRuntimeContext().getState(new ValueStateDescriptor(temperatureState, Temperature.class));size getRuntimeContext().getState(new ValueStateDescriptor(sizeState, Integer.class));temperature getRuntimeContext().getState(new ValueStateDescriptor(temperature, Long.class));}Overridepublic void processElement(Temperature value, ProcessFunctionTemperature, Temperature.Context ctx, CollectorTemperature out) throws Exception {Temperature value1 temperatureState.value();//System.out.println(ctx.timestamp());System.out.printf(当前事件时间DateFormat.getDateTime(value.getTimestamp()));System.out.println(当前水位线DateFormat.getDateTime(ctx.timerService().currentWatermark()));if(value1 null){temperatureState.update(value);temperatureListState.add(value);size.update(1);temperature.update(ctx.timerService().currentProcessingTime());ctx.timerService().registerProcessingTimeTimer(temperature.value()1000*10);}else{if(value1.getTemperature() value.getTemperature()){temperatureState.update(value);temperatureListState.add(value);size.update(size.value()1);//System.out.println(size.value());if(size.value() 3){System.out.printf(警告警告);IteratorTemperature iterator temperatureListState.get().iterator();while(iterator.hasNext()){out.collect(iterator.next());}temperatureListState.clear();temperatureState.clear();size.clear();ctx.timerService().deleteProcessingTimeTimer(temperature.value()1000*10);}}else{System.out.println(温度降低了);temperatureState.update(value);temperatureListState.clear();temperatureListState.add(value);size.update(1);ctx.timerService().deleteProcessingTimeTimer(temperature.value()1000*10);temperature.update(value.getTimestamp());ctx.timerService().registerProcessingTimeTimer(temperature.value()1000*10);}}}Overridepublic void onTimer(long timestamp, ProcessFunctionTemperature, Temperature.OnTimerContext ctx, CollectorTemperature out) throws Exception {System.out.printf(时间到了清空温度DateFormat.getDateTime(ctx.timestamp()));temperatureListState.clear();temperatureState.clear();size.clear();if(temperature.value() ! null)ctx.timerService().deleteProcessingTimeTimer(temperature.value() 10*1000);}});process.print(当前警告温度为);env.execute();} }//自己定义数据源 class SourceTemperature extends RichSourceFunctionTemperature {Overridepublic void run(SourceContextTemperature ctx) throws Exception {Scanner scanner new Scanner(System.in);while (true) {Temperature temperature new Temperature();System.out.print(请输入温度 );//double temp Math.random()*40;double temp scanner.nextDouble();//System.out.println(temp);temperature.setTemperature(temp);temperature.setTimestamp(new Date().getTime());ctx.collect(temperature);//Thread.sleep(1000);}}Overridepublic void cancel() {} }//自定义实体类 class Temperature1 {public Temperature1(double temperature, long timestamp) {this.temperature temperature;this.timestamp timestamp;}public Temperature1(){};//温度private double temperature;//时间private long timestamp;//idprivate String day 2024-12-24;public double getTemperature() {return temperature;}public void setTemperature(double temperature) {this.temperature temperature;}public long getTimestamp() {return timestamp;}public void setTimestamp(long timestamp) {this.timestamp timestamp;}public String getDay() {return day;}public void setDay(String day) {this.day day;}Overridepublic String toString() {return Temperature1{ temperature temperature , timestamp timestamp , day day \ };} } 事件处理时间是不需要下一个事件触发的 三、总结 事件时间event time) 与事件处理时间process time定时器整体代码其实差不多主要是在注册定时器的时候选择的方法 //事件时间 ctx.timerService().registerEventTimeTimer(value.getTimestamp()); //事件处理事件 ctx.timerService().registerProcessingTimeTimer(temperature.value()1000*10); 和不同定时器的逻辑。注意事件时间定时器是需要下一个事件来触发上一个事件的定时任务但是事件处理时间定时器是不需要下一个事件来触发的他是根据注册时间和系统时间的差值来触发的。 上面我把注册时间改为了过去很久的时间来一个就触发一次定时任务因为注册时间与当前系统时间相差10秒所以会直接触发。
http://www.zqtcl.cn/news/777465/

相关文章:

  • 网页设计与网站建设期末考试wordpress文章页面图片自动适应
  • 网站建设费要交印花税吗国内ui网站
  • wordpress安装在本地专业seo网络推广
  • 农庄网站模板网络文化经营许可证图片
  • 微信做模板下载网站有哪些内容江苏省常州建设高等职业技术学校网站
  • 网站开发补充合同范本docker 部署wordpress
  • 学会了php的语法怎么做网站海外推广媒体
  • 东莞网站建设排行企业网站开发公司大全
  • wordpress商城必备软件重庆seo优化推广
  • 蚌埠百度做网站山东省无障碍网站建设标准
  • 平乡企业做网站流量精灵官网
  • 厦门做网站优化公司wordpress cx-udy
  • 做外汇门户网站WordPress推广返佣插件
  • c语言在线编程网站学生个人网页设计作品
  • 南阳网站排名优化报价wordpress视频付费
  • 政务新网站建设ipv6改造wordpress
  • 店招免费设计在线生成网站seo优化关键词快速排名上首页
  • 毕设做系统与网站答辩wordpress个人模板
  • 农家乐网站建设wordpress改变访问目录结构
  • 单位网站建设的重要性盐城城南建设局一局网站
  • 网站登录验证码显示不出来刘强东当年做网站读的什么书
  • 网站seo优化步骤动态ip可以做网站
  • 用自己电脑怎么做网站广州公司建站
  • 购物网站前端浮动特效怎么做常用开发工具
  • 网页设计与制作精品课程网站wordpress文章页禁止右键
  • 英迈思做网站做的怎样中国建设银行官方网站纪念币
  • 最专业的手机网站建设厦门建设厅网站
  • 贵州省建设工程质量检测协会网站c 网站开发类似优酷
  • 关于网站建设申请卢沟桥做网站的公司
  • 网站源码对应的数据库怎么做单页成品网站