当前位置: 首页 > news >正文

如何做好网站优化北京网站开发周期

如何做好网站优化,北京网站开发周期,百度ocpc如何优化,怎么用ppt做网站设计目录 1、关于时间语义 1.1事件时间 1.2处理时间​编辑 2、什么是水位线 2.1 顺序流和乱序流 2.2乱序数据的处理 2.3 水位线的特性 3 、水位线的生成 3.1 生成水位线的总体原则 3.2 水位线生成策略 3.3 Flink内置水位线 3.3.1 有序流中内置水位线设置 3.4.2 断点式…目录 1、关于时间语义 1.1事件时间 1.2处理时间​编辑 2、什么是水位线 2.1 顺序流和乱序流 2.2乱序数据的处理 2.3 水位线的特性 3 、水位线的生成 3.1 生成水位线的总体原则 3.2 水位线生成策略 3.3 Flink内置水位线 3.3.1 有序流中内置水位线设置 3.4.2 断点式水位线生成器Punctuated Generator 3.4.3 在数据源中发送水位线 4、水位线的传递 5、迟到数据的处理 1、关于时间语义 1.1事件时间 一般情况下业务日志数据中都会记录数据生成的时间戳timestamp它就可以作为事件时间的判断基础。从Flink1.12版本开始Flink已经将事件时间作为默认的时间语义了。 1.2处理时间 2、什么是水位线 在Flink中用来衡量事件时间进展的标记就被称作“水位线”Watermark。说白了就是事件时间戳。 2.1 顺序流和乱序流 有序流就是指数据按照生成的先后顺序每条数据产生一个有先后顺序的水位线 这是一种理想的状态数据量较小而在实际中我们产生的数据量往往非常庞大而数据之间的时间间隔非常之小所以为了提高效率一般会每隔一段时间生成一个水位线。 在实际生产中由于多服务之间网络传输等的因素往往我们的数据流并不是我们所想的顺序结果而是数据先后错乱这就是乱序流。 2.2乱序数据的处理 由于数据是乱序的我们无法正确处理“迟到”的数据为了让窗口能够正确的收集到迟到的数据我们也可以让窗口等上一段时间比如2秒。也就是说我们可以在数据的时间戳基础上加上一些延迟来尽量保证不丢数据。 2.3 水位线的特性 3 3 、水位线的生成 3.1 生成水位线的总体原则 完美的水位线是“绝对正确”的也就是一个水位线一旦出现就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确就必须等足够长的时间这会带来更高的延迟。 如果我们希望处理得更快、实时性更强那么可以将水位线延迟设得低一些。这种情况下可能很多迟到数据会在水位线之后才到达就会导致窗口遗漏数据计算结果不准确。当然如果我们对准确性完全不考虑、一味地追求处理速度可以直接使用处理时间语义这在理论上可以得到最低的延迟。 所以Flink中的水位线其实是流处理中对低延迟和结果正确性的一个权衡机制而且把控制的权力交给了程序员我们可以在代码中定义水位线的生成策略。 3.2 水位线生成策略 在Flink的DataStream API中有一个单独用于生成水位线的方法.assignTimestampsAndWatermarks()它主要用来为流中的数据分配时间戳并生成水位线来指示事件时间。 DataStreamEvent stream env.addSource(new ClickSource());DataStreamEvent withTimestampsAndWatermarks stream.assignTimestampsAndWatermarks(watermark strategy); WatermarkStrategy作为参数这就是所谓的“水位线生成策略”。WatermarkStrategy是一个接口该接口中包含了一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”WatermarkGenerator。 public interface WatermarkStrategyT extends TimestampAssignerSupplierT,WatermarkGeneratorSupplierT{// 负责从流中数据元素的某个字段中提取时间戳并分配给元素。时间戳的分配是生成水位线的基础。OverrideTimestampAssignerT createTimestampAssigner(TimestampAssignerSupplier.Context context);// 主要负责按照既定的方式基于时间戳生成水位线OverrideWatermarkGeneratorT createWatermarkGenerator(WatermarkGeneratorSupplier.Context context); } 3.3 Flink内置水位线 3.3.1 有序流中内置水位线设置 对于有序流主要特点就是时间戳单调增长所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。 public class WatermarkMonoDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategyWaterSensor watermarkStrategy WatermarkStrategy// 1.1 指定watermark生成升序的watermark没有等待时间.WaterSensorforMonotonousTimestamps()// 1.2 指定 时间戳分配器从数据中提取.withTimestampAssigner(new SerializableTimestampAssignerWaterSensor() {Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {// 返回的时间戳要 毫秒System.out.println(数据 element ,recordTs recordTimestamp);return element.getTs() * 1000L;}});// TODO 2. 指定 watermark策略SingleOutputStreamOperatorWaterSensor sensorDSwithWatermark sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor - sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}}).print();env.execute();} } 3.3.2 乱序流中内置水位线设置 调用WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。 这个方法需要传入一个maxOutOfOrderness参数表示“最大乱序程度”它表示数据流中乱序数据时间戳的最大差值 public class WatermarkOutOfOrdernessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategyWaterSensor watermarkStrategy WatermarkStrategy// 1.1 指定watermark生成乱序的等待3s.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3))// 1.2 指定 时间戳分配器从数据中提取.withTimestampAssigner((element, recordTimestamp) - {// 返回的时间戳要 毫秒System.out.println(数据 element ,recordTs recordTimestamp);return element.getTs() * 1000L;});// TODO 2. 指定 watermark策略SingleOutputStreamOperatorWaterSensor sensorDSwithWatermark sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor - sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}}).print();env.execute();} } 3.4 自定义水位线生成器 3.4.1 周期性水位线生成器Periodic Generator 周期性生成器一般是通过onEvent()观察判断输入的事件而在onPeriodicEmit()里发出水位线。 import com.atguigu.bean.Event; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;// 自定义水位线的产生 public class CustomPeriodicWatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new ClickSource()).assignTimestampsAndWatermarks(new CustomWatermarkStrategy()).print();env.execute();}public static class CustomWatermarkStrategy implements WatermarkStrategyEvent {Overridepublic TimestampAssignerEvent createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssignerEvent() {Overridepublic long extractTimestamp(Event elementlong recordTimestamp) {return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段}};}Overridepublic WatermarkGeneratorEvent createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new CustomBoundedOutOfOrdernessGenerator();}}public static class CustomBoundedOutOfOrdernessGenerator implements WatermarkGeneratorEvent {private Long delayTime 5000L; // 延迟时间private Long maxTs -Long.MAX_VALUE delayTime 1L; // 观察到的最大时间戳Overridepublic void onEvent(Event eventlong eventTimestampWatermarkOutput output) {// 每来一条数据就调用一次maxTs Math.max(event.timestampmaxTs); // 更新最大时间戳}Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线默认200ms调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}} } 如果想修改默认周期时间可以通过下面方法修改。 //修改默认周期为400ms env.getConfig().setAutoWatermarkInterval(400L); 3.4.2 断点式水位线生成器Punctuated Generator 断点式生成器会不停地检测onEvent()中的事件当发现带有水位线信息的事件时就立即发出水位线。我们把发射水位线的逻辑写在onEvent方法当中即可。 3.4.3 在数据源中发送水位线 我们也可以在自定义的数据源中抽取事件时间然后发送水位线。这里要注意的是在自定义数据源中发送了水位线以后就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。 env.fromSource( kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), kafkasource ) 4、水位线的传递 在流处理中上游任务处理完水位线、时钟改变之后要把当前的水位线再次发出广播给所有的下游子任务。而当一个任务接收到多个上游并行任务传递来的水位线时应该以最小的那个作为当前任务的事件时钟。 水位线在上下游任务之间的传递非常巧妙地避免了分布式系统中没有统一时钟的问题每个任务都以“处理完之前所有数据”为标准来确定自己的时钟。 也就是说水位线的传递是以最小事件时间为准则。 5、迟到数据的处理 5.1 推迟水印推进 在水印产生时设置一个乱序容忍度推迟系统时间的推进保证窗口计算被延迟执行为乱序的数据争取更多的时间进入窗口。 WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)); 5.2 设置窗口延迟关闭 当触发了窗口计算后会先计算当前的结果但是此时并不会关闭窗口。直到wartermark 超过了窗口结束时间推迟时间此时窗口会真正关闭。 .window(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(3)) 5.3 使用侧流接收迟到的数据 .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(3)) .sideOutputLateData(lateWS) 完整示例 public class WatermarkLateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction());WatermarkStrategyWaterSensor watermarkStrategy WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, recordTimestamp) - element.getTs() * 1000L);SingleOutputStreamOperatorWaterSensor sensorDSwithWatermark sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);OutputTagWaterSensor lateTag new OutputTag(late-data, Types.POJO(WaterSensor.class));SingleOutputStreamOperatorString process sensorDSwithWatermark.keyBy(sensor - sensor.getId()).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)) // 推迟2s关窗.sideOutputLateData(lateTag) // 关窗后的迟到数据放入侧输出流.process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}});process.print();// 从主流获取侧输出流打印process.getSideOutput(lateTag).printToErr(关窗后的迟到数据);env.execute();} }
http://www.zqtcl.cn/news/559658/

相关文章:

  • 哪个网站可以做英语语法题智慧云建筑信息平台
  • 网站怎么做百度才会收录金乡县网站开发
  • 深圳移动网站建站网站如何做播放线路
  • 深圳网站建设q.479185700惠哪个网站可以免费设计房子
  • 迁西网站开发网站建设技术网站建
  • 网站建设与管理课程报告能够做外贸的网站有哪些
  • 浅析社区网站的建设如何建立企业网站
  • 网站建设尺寸像素是多少广州商城型网站建设
  • 重庆自助建站模板简述网络营销的特点
  • 企业网站托管一个月多少钱网页设计规范2018
  • 网站建设费用摊销会计分录合肥网站建设哪里好
  • 郑州市建设工程造价信息网站关于工程项目建设的网站
  • 网站做淘宝客收入咋样景区门户网站建设方案
  • 遵义做网站推广西安都有哪些公司
  • 万网建网站流程产品展示网站模板php
  • 新津县建设局网站网站做301
  • 网站域名续费如何建设一个简易网站
  • 网站整体迁移该怎么做wordpress 图片调用api接口
  • 网站获得流量最好的方法是什么 ( )汕头建设学校的网站
  • 网上下载的网站后台安全吗仿系统之家网站源码
  • 网站实名审核高等教材电工学久久建筑网
  • 化学试剂购买网站网站节点加速
  • 桂林城乡建设局网站在线咨询免费
  • 长治网站设计制作网站ps怎么做网站导航内嵌式
  • 网站 橙色前台网站开发
  • 滨海网站建设服务商电子商务网站建设与维护pdf
  • 企业网站建设方案效果h5网页制作app
  • 国内搜索引擎网站免费无线
  • 龙岩做网站价格室内建筑设计
  • 闲鱼上面给人做网站造退款微信登录建设银行网站