当前位置: 首页 > news >正文

tk域名注册网站阿里云最低服务器可以做几个网站

tk域名注册网站,阿里云最低服务器可以做几个网站,吉林建设教育协会网站,电子商务网站建设与管理课后习题在窗口的处理过程中#xff0c;基于数据的时间戳#xff0c;自定义一个“逻辑时钟”。这个时钟的时间不会自动流逝#xff1b;它的时间进展#xff0c;就是靠着新到数据的时间戳来推动的。 什么是水位线 用来衡量事件时间进展的标记#xff0c;就被称作“水位线”#x… 在窗口的处理过程中基于数据的时间戳自定义一个“逻辑时钟”。这个时钟的时间不会自动流逝它的时间进展就是靠着新到数据的时间戳来推动的。 什么是水位线 用来衡量事件时间进展的标记就被称作“水位线”Watermark。 具体实现上水位线可以看作一条特殊的数据记录它是插入到数据流中的一个标记点主要内容就是一个时间戳用来指示当前的事件时间。而它插入流中的位置就应该是在某个数据到来之后这样就可以从这个数据中提取时间戳作为当前水位线的时间戳了。 有序流中水位线 1理想状态数据量小数据应该按照生成的先后顺序进入流中每条数据产生一个水位线 2实际应用中如果当前数据量非常大且同时涌来的数据时间差会非常小比如几毫秒往 往对处理计算也没什么影响。所以为了提高效率会每隔一段时间生成一个水位线。 乱序流中水位线 在分布式系统中数据在节点间传输会因为网络传输延迟的不确定性导致顺序发生改变这就是 所谓的“乱序数据”。 1乱序数据量小还是靠数据来驱动每来一个数据就提取它的时间戳、插入一个水位线。乱序数据插入新的水位线时要先判断一下时间戳是否比之前的大否则就不再生成新的水位线。也就是说只有数据的时间戳比当前时钟大才能推动时钟前进这时才插入水位线。 2乱序数据量大考虑到大量数据同时到来的处理效率可以周期性地生成水位线。这时 只需要保存一下之前所有数据中的最大时间截需要插入水位线时就直接以它作为时间戳生成新的水位线。 3乱序迟到数据无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据设置迟到时间比如2秒也就是用当前已有数据的最大时间戳减去2秒就是要插入的水位线的时间戳。9秒的数据到来之后事件时钟不会直接推进到9秒而是进展到了7秒必须等到11秒的数据到来之后事件时钟才会进展到9秒此时迟到2秒的数据也会被正确收集处理。【迟到时间不能设置过长否则会对实时性会有所影响】 水位线的特性 水位线是插入到数据流中的一个标记可以认为是一个特殊的数据水位线主要的内容是一个时间戳用来表示当前事件时间的进展水位线是基于数据的时间戳生成的水位线的时间戳必须单调递增以确保任务的事件时间时钟一直向前推进水位线可以通过设置延迟来保证正确处理乱序数据一个水位线Watermark(t)表示在当前流中事件时间已经达到了时间戳t代表t之前的所 有数据都到齐了之后流中不会出现时间t≤t的数据 水位线与窗口配合完成对乱序数据的正确处理。 水位线是流处理中对低延迟和结果正确性的一个权衡机制。 水位线生成策略 生成水位线的方法.assignTimestampsAndWatermarks()主要用来为流中的数据分配时间戳并生成水位线来指示事件时间。【指定水位线生成策略】 stream.assignTimestampsAndWatermarks(watermark strategy);WatermarkStrategy 水位线策略是一个接口里面内置一些生成策略 有序流中内置水位线设置 时间戳单调增长所以永远不会出现迟到数据的问题。WatermarkStrategy.forMonotonousTimestamps() WatermarkStrategyWaterSensor watermarkStrategy WatermarkStrategy.WaterSensorforMonotonousTimestamps()// 指定时间戳分配器从数据中提取 单位毫秒.withTimestampAssigner((SerializableTimestampAssignerWaterSensor) (element, recordTimestamp) - {System.out.println( 数据 element ,recordTs recordTimestamp);return element.getTs() * 1000L;});乱序流中内置水位线设置 由于乱序流中需要等待迟到数据到齐必须设置一个固定量的延迟时间。WatermarkStrategy. forBoundedOutOfOrderness() WatermarkStrategyWaterSensor watermarkStrategy WatermarkStrategy// 乱序数据等待3s.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定时间戳分配器从数据中提取 单位毫秒.withTimestampAssigner((SerializableTimestampAssignerWaterSensor) (element, recordTimestamp) - {System.out.println( 数据 element ,recordTs recordTimestamp);return element.getTs() * 1000L;});自定义水位线生成器 1周期性水位线生成器Periodic Generator 周期性生成器一般是通过 onEvent()观察判断输入的事件而在onPeriodicEmit()里发出水位线。 模仿该类BoundedOutOfOrdernessWatermarks public class CustomBoundedOutOfOrdernessGeneratorT implements WatermarkGeneratorT {private Long delayTime 5000L; // 延迟时间private Long maxTs Long.MIN_VALUE delayTime 1L; // 观察到的最大时间戳Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs Math.max(eventTimestamp, maxTs); // 更新最大时间戳}Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线默认 200ms 调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));} }在 onPeriodicEmit()里调用 output.emitWatermark()就可以发出水位线了方法由系统框架周期性地调用默认 200ms 一次。【不建议修改】 env.getConfig().setAutoWatermarkInterval(400L);2断点式水位线生成器Punctuated Generator 断点式生成器会不停地检测 onEvent()中的事件当发现带有水位线信息的事件时就立即发出水位线。 如下只要有数据来就直接发射水位线 Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs Math.max(eventTimestamp, maxTs); // 更新最大时间戳output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}3在数据源中发送水位线 可以在自定义的数据源中抽取事件时间然后发送水位线。 自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks 方法生成水位线二者只能取其一。 env.fromSource( kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), // WatermarkStrategy.noWatermarks() 或者不发送水位线 kafkasource )水位线的传递空闲等待withIdleness 一个任务接收到多个上游并行任务传递来的水位线时应该以最小的作为当前任务的事件时钟。 如下案例当程序并行度设置为2时自定义分区器导致一个分区一直拿不到数据最小时钟一直为null此时如不加以干预事件时钟将永远不会推进存在问题。设置空闲时间当超过空闲时间一直收不到该分区数据直接忽略该分区还是会依旧推进时间时钟。 env.setParallelism(2);// 自定义分区器数据%分区数只输入奇数都只会去往map 的一个子任务SingleOutputStreamOperatorInteger socketDS env.socketTextStream(xxxx, 7777).partitionCustom(new MyPartitioner(), r - r).map(Integer::parseInt).assignTimestampsAndWatermarks(WatermarkStrategy.IntegerforMonotonousTimestamps().withTimestampAssigner((r, ts) - r * 1000L).withIdleness(Duration.ofSeconds(5)) // 空闲等待 5s);// 分成两组奇数一组偶数一组开 10s 的事件时间滚动窗口socketDS.keyBy(r - r % 2).window(TumblingEventTimeWindows.of(Time.seconds(10)))...迟到数据的处理 1推迟水印推进设置延迟时间 水印产生时设置一个乱序容忍度推迟系统时间的推进保证窗口计算被延迟执行为乱序的数据争取更多的时间进入窗口。 WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));2设置窗口延迟关闭 Flink 的窗口也允许迟到数据。当触发了窗口计算后会先计算当前的结果但是此时并不会关闭窗口。当达到设置延迟关闭时间之后才会真正关闭窗口关闭窗口后再迟到的数据就不会再处理。 .window(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(3))3使用侧流接收迟到的数据 最后兜底窗口关闭之后的迟到数据使用侧输出流输出。 完整方案 public class WatermarkLateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(124.222.253.33, 7777).map(new WaterSensorMapFunction());WatermarkStrategyWaterSensor watermarkStrategy WatermarkStrategy// 1.设置迟到时间 3s.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, recordTimestamp) - element.getTs() * 1000L)// .withIdleness(Duration.ofSeconds(5)); // 空闲等待 5s;SingleOutputStreamOperatorWaterSensorsensorDSWithWatermark sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);OutputTagWaterSensor lateTag new OutputTag(latedata, Types.POJO(WaterSensor.class));SingleOutputStreamOperatorString process sensorDSWithWatermark.keyBy(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(10))) .allowedLateness(Time.seconds(2)) // 2.推迟2s关窗.sideOutputLateData(lateTag) // 3.关窗后的迟到数据放入侧输出流.process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd ) 包含 count 条数据 elements);}});process.print();// 从主流获取侧输出流打印process.getSideOutput(lateTag).printToErr(关窗后的迟到数据);env.execute();} }
http://www.zqtcl.cn/news/928323/

相关文章:

  • 在线网站排名工具跨境电商卖什么产品最赚钱
  • 电商网页设计网站什么是网络营销产生的现实基础
  • 网站开发需要注意的阿里云做网站可以免备案吗
  • 网站开发后端菜鸟教程本地安装wordpress nginx
  • 网站做端口映射域名怎么做网站
  • 港口建设征收 申报网站网站内容建设与管理
  • 长沙企业网站建设较好的公司个人社保缴费比例
  • 网站备案信息页面惠安网站建设报价
  • 东莞做微网站建设十大免费软件下载
  • 做的很好的黑白网站成都小程序开发
  • 发布做任务网站wordpress新建用户
  • 郑州市东区建设环保局官方网站工作简历模板免费下载
  • 虾皮跨境电商网站公司网站建设费计入什么费用
  • 东光有做网站的吗公司装修图片大全
  • 一个域名下多个网站项目网手游
  • 网站建设竞价托管服务wordpress搬站流程
  • 做视频网站视频文件都存放在哪室内设计网站平台
  • 外贸网站建设网合肥网站设计公
  • 网站建设设计制作 熊掌号一键生成小程序商城
  • 北滘做网站企业展厅 设计 公司 平安
  • 网站做seo外链常州营销型网站建设
  • 乐清门户网站建设网络推广关键词优化公司
  • 自己做的网站被攻击了企业展厅方案设计公司
  • 可信赖的郑州网站建设公司网站怎样实名认证
  • 创建一个网站的步骤是中国机械加工网招聘信息
  • 做电影解析网站烟台网站建设外贸
  • 做网站 网上接单汽车网站开发流程
  • 2017网站开发发展前景主页网站建设
  • 苏州手机网站建设费用上海企业制作网站
  • 网站上怎样做轮播图网站后台乱码怎么办