当前位置: 首页 > news >正文

wordpress微信公众平台插件关键词优化一年的收费标准

wordpress微信公众平台插件,关键词优化一年的收费标准,家居网站建设方案,深圳做网页背景#xff1a; 广播状态可以用于规则表或者配置表的实时更新#xff0c;本文就是用一个欺诈检测的flink作业作为例子看一下BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用 BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用 1.首先看主流…背景 广播状态可以用于规则表或者配置表的实时更新本文就是用一个欺诈检测的flink作业作为例子看一下BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用 BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用 1.首先看主流程主流程中使用了两个Broadcast广播的状态这两个Broadcast广播的状态是独立的 // 这里面包含规则广播状态的两次使用方法,分别在DynamicKeyFunction处理函数和DynamicAlertFunction处理函数,注意这两个处理函数中的广播状态是独立的也就是需要分别维度不能共享// Processing pipeline setupDataStreamAlert alerts transactions.connect(rulesStream).process(new DynamicKeyFunction()).uid(DynamicKeyFunction).name(Dynamic Partitioning Function).keyBy((keyed) - keyed.getKey()).connect(rulesStream).process(new DynamicAlertFunction()).uid(DynamicAlertFunction).name(Dynamic Rule Evaluation Function);2.BroadcastProcessFunction的处理这里面会维护这个算子本身的广播状态并把所有的事件扩散发送到下一个算子 public class DynamicKeyFunctionextends BroadcastProcessFunctionTransaction, Rule, KeyedTransaction, String, Integer {Overridepublic void open(Configuration parameters) {}// 这里会把每个事件结合上广播状态中的每个规则生成N条记录流转到下一个算子Overridepublic void processElement(Transaction event, ReadOnlyContext ctx, CollectorKeyedTransaction, String, Integer out)throws Exception {ReadOnlyBroadcastStateInteger, Rule rulesState ctx.getBroadcastState(Descriptors.rulesDescriptor);forkEventForEachGroupingKey(event, rulesState, out);}// 独立维护广播状态,可以在广播状态中新增删除或者清空广播状态Overridepublic void processBroadcastElement(Rule rule, Context ctx, CollectorKeyedTransaction, String, Integer out) throws Exception {log.info({}, rule);BroadcastStateInteger, Rule broadcastState ctx.getBroadcastState(Descriptors.rulesDescriptor);handleRuleBroadcast(rule, broadcastState);if (rule.getRuleState() RuleState.CONTROL) {handleControlCommand(rule.getControlType(), broadcastState);}}}static void handleRuleBroadcast(Rule rule, BroadcastStateInteger, Rule broadcastState)throws Exception {switch (rule.getRuleState()) {case ACTIVE:case PAUSE:broadcastState.put(rule.getRuleId(), rule);break;case DELETE:broadcastState.remove(rule.getRuleId());break;}}3.KeyedBroadcastProcessFunction的处理,这里面也是会维护这个算子本身的广播状态此外还有键值分区状态特别注意的是在处理广播元素时可以用applyToKeyedState方法对所有的键值分区状态应用某个方法对于ontimer方法依然可以访问键值分区状态和广播状态 /** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* License); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an AS IS BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.ververica.field.dynamicrules.functions;import static com.ververica.field.dynamicrules.functions.ProcessingUtils.addToStateValuesSet; import static com.ververica.field.dynamicrules.functions.ProcessingUtils.handleRuleBroadcast;import com.ververica.field.dynamicrules.Alert; import com.ververica.field.dynamicrules.FieldsExtractor; import com.ververica.field.dynamicrules.Keyed; import com.ververica.field.dynamicrules.Rule; import com.ververica.field.dynamicrules.Rule.ControlType; import com.ververica.field.dynamicrules.Rule.RuleState; import com.ververica.field.dynamicrules.RuleHelper; import com.ververica.field.dynamicrules.RulesEvaluator.Descriptors; import com.ververica.field.dynamicrules.Transaction; import java.math.BigDecimal; import java.util.*; import java.util.Map.Entry; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.accumulators.SimpleAccumulator; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MeterView; import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; import org.apache.flink.util.Collector;/** Implements main rule evaluation and alerting logic. */ Slf4j public class DynamicAlertFunctionextends KeyedBroadcastProcessFunctionString, KeyedTransaction, String, Integer, Rule, Alert {private static final String COUNT COUNT_FLINK;private static final String COUNT_WITH_RESET COUNT_WITH_RESET_FLINK;private static int WIDEST_RULE_KEY Integer.MIN_VALUE;private static int CLEAR_STATE_COMMAND_KEY Integer.MIN_VALUE 1;private transient MapStateLong, SetTransaction windowState;private Meter alertMeter;private MapStateDescriptorLong, SetTransaction windowStateDescriptor new MapStateDescriptor(windowState,BasicTypeInfo.LONG_TYPE_INFO,TypeInformation.of(new TypeHintSetTransaction() {}));Overridepublic void open(Configuration parameters) {windowState getRuntimeContext().getMapState(windowStateDescriptor);alertMeter new MeterView(60);getRuntimeContext().getMetricGroup().meter(alertsPerSecond, alertMeter);}// 键值分区状态和广播状态联合处理在这个方法中可以更新键值分区状态然后广播状态只能读取Overridepublic void processElement(KeyedTransaction, String, Integer value, ReadOnlyContext ctx, CollectorAlert out)throws Exception {long currentEventTime value.getWrapped().getEventTime();addToStateValuesSet(windowState, currentEventTime, value.getWrapped());long ingestionTime value.getWrapped().getIngestionTimestamp();ctx.output(Descriptors.latencySinkTag, System.currentTimeMillis() - ingestionTime);Rule rule ctx.getBroadcastState(Descriptors.rulesDescriptor).get(value.getId());if (noRuleAvailable(rule)) {log.error(Rule with ID {} does not exist, value.getId());return;}if (rule.getRuleState() Rule.RuleState.ACTIVE) {Long windowStartForEvent rule.getWindowStartFor(currentEventTime);long cleanupTime (currentEventTime / 1000) * 1000;ctx.timerService().registerEventTimeTimer(cleanupTime);SimpleAccumulatorBigDecimal aggregator RuleHelper.getAggregator(rule);for (Long stateEventTime : windowState.keys()) {if (isStateValueInWindow(stateEventTime, windowStartForEvent, currentEventTime)) {aggregateValuesInState(stateEventTime, aggregator, rule);}}BigDecimal aggregateResult aggregator.getLocalValue();boolean ruleResult rule.apply(aggregateResult);ctx.output(Descriptors.demoSinkTag,Rule rule.getRuleId() | value.getKey() : aggregateResult.toString() - ruleResult);if (ruleResult) {if (COUNT_WITH_RESET.equals(rule.getAggregateFieldName())) {evictAllStateElements();}alertMeter.markEvent();out.collect(new Alert(rule.getRuleId(), rule, value.getKey(), value.getWrapped(), aggregateResult));}}}//维护广播状态新增/删除或者整个清空,值得注意的是处理广播元素时可以对所有的键值分区状态应用某个函数比如这里当收到某个属于控制消息的广播消息时使用applyToKeyedState方法把所有的键值分区状态都清空Overridepublic void processBroadcastElement(Rule rule, Context ctx, CollectorAlert out)throws Exception {log.info({}, rule);BroadcastStateInteger, Rule broadcastState ctx.getBroadcastState(Descriptors.rulesDescriptor);handleRuleBroadcast(rule, broadcastState);updateWidestWindowRule(rule, broadcastState);if (rule.getRuleState() RuleState.CONTROL) {handleControlCommand(rule, broadcastState, ctx);}}private void handleControlCommand(Rule command, BroadcastStateInteger, Rule rulesState, Context ctx) throws Exception {ControlType controlType command.getControlType();switch (controlType) {case EXPORT_RULES_CURRENT:for (Map.EntryInteger, Rule entry : rulesState.entries()) {ctx.output(Descriptors.currentRulesSinkTag, entry.getValue());}break;case CLEAR_STATE_ALL:ctx.applyToKeyedState(windowStateDescriptor, (key, state) - state.clear());break;case CLEAR_STATE_ALL_STOP:rulesState.remove(CLEAR_STATE_COMMAND_KEY);break;case DELETE_RULES_ALL:IteratorEntryInteger, Rule entriesIterator rulesState.iterator();while (entriesIterator.hasNext()) {EntryInteger, Rule ruleEntry entriesIterator.next();rulesState.remove(ruleEntry.getKey());log.info(Removed Rule {}, ruleEntry.getValue());}break;}}private boolean isStateValueInWindow(Long stateEventTime, Long windowStartForEvent, long currentEventTime) {return stateEventTime windowStartForEvent stateEventTime currentEventTime;}private void aggregateValuesInState(Long stateEventTime, SimpleAccumulatorBigDecimal aggregator, Rule rule) throws Exception {SetTransaction inWindow windowState.get(stateEventTime);if (COUNT.equals(rule.getAggregateFieldName())|| COUNT_WITH_RESET.equals(rule.getAggregateFieldName())) {for (Transaction event : inWindow) {aggregator.add(BigDecimal.ONE);}} else {for (Transaction event : inWindow) {BigDecimal aggregatedValue FieldsExtractor.getBigDecimalByName(rule.getAggregateFieldName(), event);aggregator.add(aggregatedValue);}}}private boolean noRuleAvailable(Rule rule) {// This could happen if the BroadcastState in this CoProcessFunction was updated after it was// updated and used in DynamicKeyFunctionif (rule null) {return true;}return false;}private void updateWidestWindowRule(Rule rule, BroadcastStateInteger, Rule broadcastState)throws Exception {Rule widestWindowRule broadcastState.get(WIDEST_RULE_KEY);if (rule.getRuleState() ! Rule.RuleState.ACTIVE) {return;}if (widestWindowRule null) {broadcastState.put(WIDEST_RULE_KEY, rule);return;}if (widestWindowRule.getWindowMillis() rule.getWindowMillis()) {broadcastState.put(WIDEST_RULE_KEY, rule);}}// ontimer方法中可以访问/更新键值分区状态读取广播状态此外ontimer方法和processElement方法以及processBroadcastElement方法是同步的不需要考虑并发访问的问题Overridepublic void onTimer(final long timestamp, final OnTimerContext ctx, final CollectorAlert out)throws Exception {Rule widestWindowRule ctx.getBroadcastState(Descriptors.rulesDescriptor).get(WIDEST_RULE_KEY);OptionalLong cleanupEventTimeWindow Optional.ofNullable(widestWindowRule).map(Rule::getWindowMillis);OptionalLong cleanupEventTimeThreshold cleanupEventTimeWindow.map(window - timestamp - window);cleanupEventTimeThreshold.ifPresent(this::evictAgedElementsFromWindow);}private void evictAgedElementsFromWindow(Long threshold) {try {IteratorLong keys windowState.keys().iterator();while (keys.hasNext()) {Long stateEventTime keys.next();if (stateEventTime threshold) {keys.remove();}}} catch (Exception ex) {throw new RuntimeException(ex);}}private void evictAllStateElements() {try {IteratorLong keys windowState.keys().iterator();while (keys.hasNext()) {keys.next();keys.remove();}} catch (Exception ex) {throw new RuntimeException(ex);}} } ps: ontimer方法和processElement方法是同步访问的没有并发的问题所以不需要考虑同时更新键值分区状态的线程安全问题 参考文献 https://flink.apache.org/2020/01/15/advanced-flink-application-patterns-vol.1-case-study-of-a-fraud-detection-system/
http://www.zqtcl.cn/news/735167/

相关文章:

  • 建设铝合金窗网站.net制作网站开发教程
  • 网站后台服务器内部错误wordpress 多级菜单
  • 怎样更新网站内容怎么查看网站是哪家公司做的
  • 建设网站网站建站建立一个网站平台需要多少钱
  • 学校网站模板 html网站建设技术路线
  • 图片网站如何做百度排名深入挖掘wordpress
  • 网站建设的前景网站建设分为哪三部分
  • 房地产公司网站下载校园二手信息网站建设
  • 有关网站空间不正确的说法是设计和建设企业网站心得和体会
  • 个人网站前置审批项怎么做投票 网站
  • 网站建设零金手指花总js源码下载从哪个网站能下载
  • 网站开发属于无形资产两人合伙做网站但不准备开公司
  • 五大类型网站网站建设投标文件
  • 崇明区建设镇网站装修公司网站制作
  • 哪些网站可以做房产推广呼家楼街道网站建设
  • 微网站怎么开通萝岗手机网站建设
  • 牙科医院网站开发内江市住房和城乡建设局网站电话号码
  • 网站建设的想法和意见芜湖的网站建设公司
  • 效果好的网站建设wordpress主题基础
  • html5建设摄影网站意义crm免费客户管理系统
  • win2008 建立网站网站策划书的撰写流程
  • 德泰诺网站建设百度网盘资源搜索引擎入口
  • 谁能给个网站谢谢wordpress 主题 后门
  • 学校网站建设目的seo教学免费课程霸屏
  • 会计公司网站模板微信网站如何制作软件
  • 烟台做网站多少钱.net网站做增删改
  • 什么网站专门做软件的深圳电商网站制作
  • 局域网做网站家装公司哪家比较好
  • 免费的行情软件网站在线使用wordpress视频分享
  • 内容平台策划书网站优化公司推荐