当前位置: 首页 > news >正文

档案网站建设文献综述企拓客软件多少钱

档案网站建设文献综述,企拓客软件多少钱,做电影网站如何盈利,网站开发界面设计用什么工具复杂事件处理CEP Flink CEP基本使用添加依赖定义匹配模式定义匹配结果验证 模式Pattern API单个模式量词条件 组合模式跳过策略模式组匹配结果 应用示例自定义消息事件自定义Pattern测试 Flink CEP Flink的CEP (Complex Event Processing) 是指Flink提供的一种用于处理复杂事件… 复杂事件处理CEP Flink CEP基本使用添加依赖定义匹配模式定义匹配结果验证 模式Pattern API单个模式量词条件 组合模式跳过策略模式组匹配结果 应用示例自定义消息事件自定义Pattern测试 Flink CEP Flink的CEP (Complex Event Processing) 是指Flink提供的一种用于处理复杂事件序列的库。复杂事件通常由多个简单事件组成这些简单事件在特定的时间窗口内以特定的顺序发生。CEP可以用于检测和识别这些复杂事件并根据预定义的模式进行操作和处理。 Flink的CEP库提供了一个灵活而强大的编程模型使用户能够指定不同事件之间的关系模式并定义事件触发的条件。它能够处理基于时间、顺序和其他属性的复杂事件模式并支持流式处理和实时数据。CEP可以用于构建基于事件的应用程序例如金融交易监控、网络流量分析、IoT数据处理等。 应用场景 Flink CEPComplex Event Processing是针对处理数据流中复杂事件模式的技术适用于多种实时数据处理场景其中包括 金融交易监控实时监控金融交易数据流以识别潜在的欺诈行为例如检测异常的交易序列或者异常的资金流动模式。网络安全分析对实时网络日志进行分析以检测网络攻击、异常行为或者安全威胁例如识别特定攻击模式或异常的网络通信序列。物联网IoT数据处理处理来自传感器和设备的实时数据以识别设备故障、异常事件或者预测维护需求例如发现特定的设备状态序列暗示了潜在的问题。市场营销和个性化推荐分析客户实时行为数据识别特定的购买模式或者行为序列以提供个性化的产品推荐或市场营销策略。生产流程监控监控工业生产线上的传感器和生产数据以检测生产异常、预测设备故障或者优化生产调度。医疗健康监控实时监控病人健康数据或医疗设备数据以检测潜在的健康危机、预测病情变化或者提供实时的健康监控服务。基本使用 添加依赖 将Flink CEP依赖项添加到pom.xml中 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-cep/artifactIdversion${flink.version}/version/dependency 定义匹配模式 DataStream要应用模式匹配的 事件必须实现正确的equals()和hash Code()方法因为 Flink CEP 使用它们来比较和匹配事件。 public static void main(String[] args) throws Exception {// 设置执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 准备事件流DataStreamTuple2String, Integer inputEventStream env.fromElements(new Tuple2(event, 1),new Tuple2(event, 2),new Tuple2(event, 3),new Tuple2(event, 4),new Tuple2(event, 5),new Tuple2(event, 6),new Tuple2(event, 7),new Tuple2(event, 8)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String, IntegerforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerTuple2String, Integer() {Overridepublic long extractTimestamp(Tuple2String, Integer event, long recordTimestamp) {return event.f1 * 1000;}})).keyBy(event - event.f0);/*** 定义复杂事件处理模式* 先匹配元素是偶数的事件然后匹配元素3的事件然后继续匹配元素是8的元素*/// 声明并初始化一个模式用于表示要在事件流中检测的模式。这个模式匹配的是一个包含String和Integer类型元素的元组. begin(start) 来定义模式的起始点PatternTuple2String, Integer, ? pattern Pattern.Tuple2String, Integerbegin(start)// 对模式的起始点应用条件条件是一个简单的过滤条件: 事件的第二个元素是偶数。.where(new SimpleConditionTuple2String, Integer() {Overridepublic boolean filter(Tuple2String, Integer event) {return event.f1 % 2 0;}})// 定义了紧接在起始点后的第一个元素命名为 middle.next(middle)// 这个元素是一个Tuple2类型的子类型.subtype(Tuple2.class)// 对第二个元素应用了迭代条件这里使用了一个迭代条件IterativeCondition来检查第二个元素是否为奇数.where(new SimpleConditionTuple2() {Overridepublic boolean filter(Tuple2 event) {// return (Integer) event.f1 5;return (Integer) event.f1 3;}})// 规定了前面定义的模式必须发生N次.times(2)// 定义了这N次发生必须是连续的.consecutive()// 定义了在之后紧跟的元素命名为 end用于表示模式的结束.followedBy(end)// 对模式的结束点应用了一个简单的条件确保事件的第二个元素等于8.where(SimpleCondition.of((Tuple2String, Integer event) - event.f1 8)).within(Time.seconds(5));// 在事件流上应用模式PatternStreamTuple2String, Integer patternStream CEP.pattern(inputEventStream.keyBy(event - event.f0), pattern);// 选择匹配结果并输出// DataStreamString result patternStream.select(new MyPatternSelectFunction());DataStreamString result patternStream.process(new MyPatternProcessFunction());result.print();// 执行任务env.execute(CEP Example);} 定义匹配结果 /*** PatternSelectFunction定义匹配结果的处理函数*/public static class MyPatternSelectFunction implements PatternSelectFunctionTuple2String, Integer, String {Overridepublic String select(MapString, ListTuple2String, Integer pattern) throws Exception {StringBuilder builder new StringBuilder();builder.append(找到匹配项: );pattern.forEach((key, value) - builder.append(key).append( ).append(value).append(; ));return builder.toString();}}/*** PatternProcessFunction定义匹配结果的处理函数*/public static class MyPatternProcessFunction extends PatternProcessFunctionTuple2String, Integer, String {Overridepublic void processMatch(MapString, ListTuple2String, Integer pattern, Context context, CollectorString collector) throws Exception {StringBuilder builder new StringBuilder();builder.append(找到匹配项: );pattern.forEach((key, value) - builder.append(key).append( ).append(value).append(; ));collector.collect(builder.toString());}}验证 1 找到匹配项: start [(event,4)]; middle [(event,5), (event,6)]; end [(event,8)]; 模式Pattern API 模式 API允许定义要从输入流中提取的复杂模式序列 每个复杂模式序列由多个简单模式组成即寻找具有相同属性的单个事件的模式每个模式必须有一个唯一的名称可以使用该名称来标识匹配的事件模式名称不能包含字符:单个模式 复杂规则中的每一个单独的模式定义就是个体模式。我们既可以定义一个给定事件出现的次数量词也可以定义一个条件来决定一个进来的事件是否被接受进入这个模式条件。 量词 默认情况下模式是单例模式可以使用量词将其转换为循环模式。 API说明pattern.oneOrMore()模式发生1次或N次pattern.times(#ofTimes)发生一次或多次的模式pattern.times(#fromTimes, #toTimes)出现特定次数的模式pattern.greedy()模式变得贪婪匹配越多越好pattern.optional()模式可以不匹配 使用示例 // 期望出现4次 pattern.times(4);// 期望出现0次或者4次 pattern.times(4).optional();// 期望出现2次、3次或者4次 pattern.times(2, 4);// 期望出现2次、3次或者4次尽可能多地重复 pattern.times(2, 4).greedy();// 期望出现0次、2次、3次或者4次 pattern.times(2, 4).optional();// 期望出现0次、2次、3次或者4次尽可能多地重复 pattern.times(2, 4).optional().greedy();// 期望出现1次或者更多次 pattern.oneOrMore();// 期望出现1次或者更多次尽可能多地重复 pattern.oneOrMore().greedy();// 期望出现0次或者更多次 pattern.oneOrMore().optional();// 期望出现0次或者更多次尽可能多地重复 pattern.oneOrMore().optional().greedy();// 期望出现2次或者更多次 pattern.timesOrMore(2);// 期望出现2次或者更多次尽可能多地重复 pattern.timesOrMore(2).greedy();// 期望出现0次、2次或者更多次 pattern.timesOrMore(2).optional()// 期望出现0次、2次或者更多次尽可能多地重复 pattern.timesOrMore(2).optional().greedy();条件 对于每个模式可以指定传入事件必须满足的条件才能被“接受”到模式中 API描述示例说明pattern.where()定义当前模式的条件。为了匹配模式事件必须满足条件。多个连续的 where() 子句会导致它们的条件被AND编辑pattern.where(SimpleCondition.of((Tuple2String, Integer event) - event.f1 8))匹配f18pattern.or()添加一个与现有条件相结合的新条件OR。仅当事件至少满足其中一个条件时它才能与模式匹配pattern.where(SimpleCondition.of((Tuple2String, Integer event) - event.f1 1)).or(SimpleCondition.of((Tuple2String, Integer event) - event.f1 2))匹配f1 1 或者 f12pattern.until()指定循环模式的停止条件。如果发生与给定条件匹配的事件则该模式将不再接受任何事件。仅适用于结合oneOrMore() NOTE:它允许在基于事件的条件下清除相应模式的状态pattern.until(SimpleCondition.of((Tuple2String, Integer event) - event.f1 2))匹配1次或多次直到f12 如果名称以foo开头则接受名为middle的模式的下一个事件并且如果该模式先前接受的事件的价格之和加上当前事件的价格事件不超过5.0的值 middle.oneOrMore().subtype(SubEvent.class).where(new IterativeConditionSubEvent() {Overridepublic boolean filter(SubEvent value, ContextSubEvent ctx) throws Exception {if (!value.getName().startsWith(foo)) {return false;}double sum value.getPrice();for (Event event : ctx.getEventsForPattern(middle)) {sum event.getPrice();}return Double.compare(sum, 5.0) 0;}});组合模式 把很多单个模式组合起来就形成了组合模式。Flink CEP支持事件之间如下形式的连续策略 严格连续性期望所有匹配事件严格一个接一个地出现中间没有任何不匹配的事件。宽松连续性忽略匹配事件之间出现的不匹配事件。非确定性宽松连续性进一步放松连续性允许忽略某些匹配事件的其他匹配。要在连续模式之间应用它们可以使用 next()对于严格的 followedBy()对于宽松的 followedByAny()对于非确定性松弛连续性 notNext()如果您不希望某个事件类型直接跟随另一个事件类型 notFollowedBy()如果您不希望某个事件类型介于其他两个事件类型之间模式序列必须以初始模式开始 PatternEvent, ? start Pattern.Eventbegin(start);PatternEvent, ? start Pattern.Eventbegin(Pattern.Eventbegin(start).where(...).followedBy(middle).where(...) );API说明示例begin(#name)定义起始模式PatternEvent, ? start Pattern.begin(“start”);begin(#pattern_sequence)定义起始模式Pattern.begin(Pattern.begin(“start”).where(…).followedBy(“middle”).where(…));next(#name)附加新模式。匹配事件必须直接继承前一个匹配事件严格连续性PatternEvent, ? next start.next(“middle”)next(#pattern_sequence)附加新模式。一系列匹配事件必须直接接续前一个匹配事件严格连续性start.next(Pattern.begin(“start”).where(…).followedBy(“middle”).where(…));followedBy(#name)附加新模式。其他事件可以发生在匹配事件和前一个匹配事件之间宽松的连续性PatternEvent, ? followedBy start.followedBy(“middle”);followedBy(#pattern_sequence)附加新模式。其他事件可以发生在匹配事件和前一个匹配事件之间宽松的连续性start.followedBy(Pattern.begin(“start”).where(…).followedBy(“middle”).where(…));followedByAny(#name)附加新模式。其他事件可以发生在匹配事件和前一个匹配事件之间并且将为每个替代匹配事件呈现替代匹配非确定性宽松连续性PatternEvent, ? followedByAny start.followedByAny(“middle”);followedByAny(#pattern_sequence)附加新模式。其他事件可以发生在匹配事件和前一个匹配事件之间并且将为每个替代匹配事件呈现替代匹配非确定性宽松连续性start.next(Pattern.begin(“start”).where(…).followedBy(“middle”).where(…));notNext()附加新的否定模式。匹配否定事件必须直接继承前一个匹配事件严格连续性才能丢弃部分匹配PatternEvent, ? notNext start.notNext(“not”);notFollowedBy()附加新的否定模式。即使匹配负事件和前一个匹配事件宽松连续性之间发生其他事件部分匹配的事件序列也将被丢弃PatternEvent, ? notFollowedBy start.notFollowedBy(“not”);within(time)定义事件序列与模式匹配的最大时间间隔。如果未完成的事件序列超过此时间则将其丢弃pattern.within(Time.seconds(10)); 使用示例 // 严格的连续性模式 PatternEvent, ? strict start.next(middle).where(...);// 宽松的连续性模式 PatternEvent, ? relaxed start.followedBy(middle).where(...);// 非确定性的宽松连续性模式 PatternEvent, ? nonDetermin start.followedByAny(middle).where(...);// 使用严格连续性的NOT模式 PatternEvent, ? strictNot start.notNext(not).where(...);// 使用宽松连续性的NOT模式 PatternEvent, ? relaxedNot start.notFollowedBy(not).where(...);跳过策略 对于给定的模式同一事件可以分配给多个成功的匹配。要控制一个事件将分配多少个匹配项需要指定跳过策略AfterMatchSkipStrategy 跳跃策略有五种类型 API说明AfterMatchSkipStrategy.noSkip()创建NO_SKIP跳过策略AfterMatchSkipStrategy.skipToNext()创建SKIP_TO_NEXT跳过策略AfterMatchSkipStrategy.skipPastLastEvent()创建SKIP_PAST_LAST_EVENT跳过策略AfterMatchSkipStrategy.skipToFirst(patternName)使用引用的模式名称patternName创建SKIP_TO_FIRST跳过策略AfterMatchSkipStrategy.skipToLast(patternName)使用引用的模式名称patternName创建SKIP_TO_LAST跳过策略 注意 当使用SKIP_TO_FIRST和SKIP_TO_LAST跳过策略时还应指定有效的PatternName SkipToFirstStrategy skipToFirstStrategy AfterMatchSkipStrategy.skipToFirst(patternName); Pattern.begin(patternName, skipToFirstStrategy);模式组 将一个模式作为条件嵌套在单个模式里就是模式组。 PatternEvent, ? start Pattern.begin( Pattern.begin(start).where(...).followedBy(start_middle).where(...) );// 严格的连续性模式 PatternEvent, ? strict start.next( Pattern.begin(next_start).where(...).followedBy(next_middle).where(...) ).times(3);// 宽松的连续性模式 PatternEvent, ? relaxed start.followedBy( Pattern.begin(followedby_start).where(...).followedBy(followedby_middle).where(...) ).oneOrMore();// 非确定性的宽松连续性模式 PatternEvent, ? nonDetermin start.followedByAny( Pattern.begin(followedbyany_start).where(...).followedBy(followedbyany_middle).where(...) ).optional();匹配结果 指定要查找的模式序列后就可以将其应用到输入流以检测潜在的匹配项。 要针对模式序列运行事件流必须创建一个PatternStream. 给定一个输入流input、一个模式pattern和一个可选的比较器comparator用于对具有相同时间戳的事件在 EventTime的情况下或在同一时刻到达进行排序 可以使用PatternProcessFunction、也可以使用旧式API例如PatternSelectFunction 1.PatternProcessFunction PatternProcessFunction有一个processMatch为每个匹配事件序列调用的方法。 PatternStreamEvent patternStream CEP.pattern(input, pattern, comparator);public static class MyPatternProcessFunction extends PatternProcessFunctionTuple2String, Integer, String {/**** param pattern MapString, ListIN其中键是模式序列中每个模式的名称值是该模式的所有已接受事件的列表IN是输入元素的类型*/Overridepublic void processMatch(MapString, ListTuple2String, Integer pattern, Context context, CollectorString collector) throws Exception {StringBuilder builder new StringBuilder();builder.append(找到匹配项: );pattern.forEach((key, value) - builder.append(key).append( ).append(value).append(; ));collector.collect(builder.toString());}}使用 DataStreamString result patternStream.process(new MyPatternProcessFunction());2.TimedOutPartialMatchHandler 每当模式具有通过within关键字附加的窗口长度时部分事件序列就有可能被丢弃因为它们超出了窗口长度。要对超时的部分匹配采取行动可以使用TimedOutPartialMatchHandler接口。 TimedOutPartialMatchHandler提供了额外的processTimedOutMatch方法每次超时的部分匹配都会调用该方法。 public static class MyPatternProcessFunction extends PatternProcessFunctionTuple2String, Integer, String implements TimedOutPartialMatchHandlerTuple2String, Integer {/**** param pattern MapString, ListIN其中键是模式序列中每个模式的名称值是该模式的所有已接受事件的列表IN是输入元素的类型*/Overridepublic void processMatch(MapString, ListTuple2String, Integer pattern, Context context, CollectorString collector) throws Exception {StringBuilder builder new StringBuilder();builder.append(找到匹配项: );pattern.forEach((key, value) - builder.append(key).append( ).append(value).append(; ));collector.collect(builder.toString());}Overridepublic void processTimedOutMatch(MapString, ListTuple2String, Integer map, Context context) throws Exception {StringBuilder builder new StringBuilder();builder.append(处理超时的部分模式: );map.forEach((key, value) - builder.append(key).append( ).append(value).append(; ));System.out.println(builder.toString());}}PatternSelectFunction public static class MyPatternSelectFunction implements PatternSelectFunctionTuple2String, Integer, String {Overridepublic String select(MapString, ListTuple2String, Integer pattern) throws Exception {StringBuilder builder new StringBuilder();builder.append(找到匹配项: );pattern.forEach((key, value) - builder.append(key).append( ).append(value).append(; ));return builder.toString();}}使用 DataStreamString result patternStream.select(new MyPatternSelectFunction());应用示例 模拟查找匹配5秒钟内连续登录失败在3次以上的用户 自定义消息事件 Data AllArgsConstructor NoArgsConstructor public class LoginEvent {/*** 用户id*/private Integer uid;/*** 是否登录成功*/private Boolean success;/*** 时间戳*/private Long timeStamp; }自定义Pattern public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamLoginEvent streamSource env.fromElements(new LoginEvent(1, false, 1000L),new LoginEvent(2, true, 2000L),new LoginEvent(3, true, 3000L),new LoginEvent(1, false, 4000L),new LoginEvent(1, false, 5000L),new LoginEvent(4, false, 5000L)).assignTimestampsAndWatermarks(WatermarkStrategy.LoginEventforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerLoginEvent() {Overridepublic long extractTimestamp(LoginEvent loginEvent, long l) {return loginEvent.getTimeStamp();}})).keyBy(r - r.getUid());PatternLoginEvent, LoginEvent pattern Pattern.LoginEventbegin(first).where(new SimpleConditionLoginEvent() {Overridepublic boolean filter(LoginEvent loginEvent) throws Exception {return !loginEvent.getSuccess();}}).next(second).where(new SimpleConditionLoginEvent() {Overridepublic boolean filter(LoginEvent loginEvent) throws Exception {return !loginEvent.getSuccess();}}).next(third).where(new SimpleConditionLoginEvent() {Overridepublic boolean filter(LoginEvent loginEvent) throws Exception {return !loginEvent.getSuccess();}}).within(Time.seconds(5));PatternStreamLoginEvent patternedStream CEP.pattern(streamSource, pattern);patternedStream.select(new PatternSelectFunctionLoginEvent, String() {Overridepublic String select(MapString, ListLoginEvent map) throws Exception {LoginEvent first map.get(first).iterator().next();LoginEvent second map.get(second).iterator().next();LoginEvent third map.get(third).iterator().next();return String.format(uid:%d 连续3次登录失败登录时间 first:%d, second:%d, third:%d, first.getUid(), first.getTimeStamp(), second.getTimeStamp(), third.getTimeStamp());}}).print();env.execute();}测试 uid:1 5秒钟内连续3次登录失败登录时间 first:1000, second:4000, third:5000
http://www.zqtcl.cn/news/628900/

相关文章:

  • 免费网站商城模板宁波企业网站搭建图片
  • 上海网站备案查询建站图标素材
  • 贵州省住房和建设厅网网站网站页面设计报告
  • 做网站友汇网快速建设网站视频教程
  • 物流公司做网站注重什么官网的网站设计公司
  • 网站备案 2016电子商务平台起名
  • 济南建站详情房地产市场分析
  • 南宁品牌网站建设公司中国商业企业网
  • 建设招标网官方网站电脑版做系统简单还是网站简单
  • 网站平台建设总结品牌网页
  • 网站建设如何就接入支付宝企业云平台
  • swoole做网站做网站建设的上市公司有哪些
  • 建设银行江苏官网招聘网站网站设置首页连接分类页的视频教程
  • 通过dede访问自己做的网站高端 建站
  • wordpress自定义json温岭新站seo
  • 网站开发的五个阶段wordpress安装在本地
  • 郴州网站建设有哪些sem优化
  • 在百度怎么申请自己的网站深圳网站建设迅美
  • wordpress 企业网站教程网站开发集成软件
  • 专业的西安免费做网站wordpress手机端插件
  • 口碑好网站建设优化大师win10下载
  • 网站建设普及型小程序开发平台好的有哪些
  • 网站建设与管理专业凡科做的网站好吗
  • wordpress添加变量福州seo网站管理
  • 哔哩哔哩免费网站观看网站制作合同书
  • 自流井移动网站建设建设网站的一般步骤
  • 手机导航网站模板上海低价网站建设
  • 如何开公司注册需要多少钱东莞网站推广优化网上推广公司
  • 新闻门户网站制作教育培训网站开发
  • 网站建设公司哪个好一点最近一周的热点新闻