当前位置: 首页 > news >正文

海口网站建设咨询帝国生成网站地图

海口网站建设咨询,帝国生成网站地图,对外贸易平台有哪些,app运营专员背景#xff1a; 接上一篇文章#xff0c;ProcessWindowFunction 结合自定义触发器会有状态过大的问题#xff0c;本文就使用AggregateFunction结合自定义触发器来实现#xff0c;这样就不会导致状态过大的问题了 AggregateFunction结合自定义触发器实现 flink对于每个窗…背景 接上一篇文章ProcessWindowFunction 结合自定义触发器会有状态过大的问题本文就使用AggregateFunction结合自定义触发器来实现这样就不会导致状态过大的问题了 AggregateFunction结合自定义触发器实现 flink对于每个窗口只需要维护一个状态不像ProcessWindowFunction那样需要把窗口内收到的所有消息都作为状态存储起来 完整代码参见 package wikiedits.func;import java.text.SimpleDateFormat; import java.util.Date;import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;public class AggregateFunctionAndTiggerDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 使用处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);env.setStateBackend(new FsStateBackend(file:///D:/tmp/flink/checkpoint/aggregatetrigger));// 并行度为1env.setParallelism(1);// 设置数据源一共三个元素DataStreamTuple2String, Integer dataStream env.addSource(new SourceFunctionTuple2String, Integer() {Overridepublic void run(SourceContextTuple2String, Integer ctx) throws Exception {int xxxNum 0;int yyyNum 0;for (int i 1; i Integer.MAX_VALUE; i) {// 只有XXX和YYY两种nameString name (0 i % 2) ? XXX : YYY;// 更新aaa和bbb元素的总数if (0 i % 2) {xxxNum;} else {yyyNum;}// 使用当前时间作为时间戳long timeStamp System.currentTimeMillis();// 将数据和时间戳打印出来用来验证数据if (xxxNum % 2000 0) {System.out.println(String.format(source%s, %s, XXX total : %d, YYY total : %d\n, name,time(timeStamp), xxxNum, yyyNum));}// 发射一个元素并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2String, Integer(name, 1), timeStamp);// 每发射一次就延时1秒Thread.sleep(1);}}Overridepublic void cancel() {}});// 将数据用5秒的滚动窗口做划分再用ProcessWindowFunctionSingleOutputStreamOperatorTuple2String, Integer mainDataStream dataStream// 以Tuple2的f0字段作为key本例中实际上key只有aaa和bbb两种.keyBy(value - value.f0)// 5秒一次的滚动窗口.timeWindow(Time.minutes(5))// 10s触发一次计算更新统计结果.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))// 统计每个key当前窗口内的元素数量然后把key、数量、窗口起止时间整理成字符串发送给下游算子.aggregate(new AggregateFunctionTuple2String, Integer, Tuple2String, Integer, Tuple2String, Integer() {// 1、初始值// 定义累加器初始值Overridepublic Tuple2String, Integer createAccumulator() {return new Tuple2String, Integer(, 0);}// 2、累加// 定义累加器如何基于输入数据进行累加Overridepublic Tuple2String, Integer add(Tuple2String, Integer value,Tuple2String, Integer accumulator) {accumulator.f0 value.f0;accumulator.f1 value.f1;return accumulator;}// 3、合并// 定义累加器如何和State中的累加器进行合并Overridepublic Tuple2String, Integer merge(Tuple2String, Integer acc1,Tuple2String, Integer acc2) {acc1.f1 acc2.f1;return acc1;}// 4、输出// 定义如何输出数据Overridepublic Tuple2String, Integer getResult(Tuple2String, Integer accumulator) {return accumulator;}});// 打印结果通过分析打印信息检查ProcessWindowFunction中可以处理所有key的整个窗口的数据mainDataStream.print();env.execute(processfunction demo : processwindowfunction);}public static String time(long timeStamp) {return new SimpleDateFormat(yyyy-MM-dd hh:mm:ss).format(new Date(timeStamp));}} 通过这种方式我们就可以做到统计某个页面一天内至今为止的点击率每10s输出一次点击率的结果并且不会引起状态膨胀的问题 参考文献 https://www.cnblogs.com/Springmoon-venn/p/13667023.html
http://www.zqtcl.cn/news/223537/

相关文章:

  • 建设工程网站tcwordpress 标题入库
  • 网站开发简直广州网站制作后缀
  • 上海短视频seo优化网站wordpress 构建知识库
  • 做的网站图片不显示2018做网站赚钱不
  • 国内建站平台网站建设是什么科目
  • 响应式个人网站psd建设银行网站联系电话
  • 大型网站开发实战品牌网站建设费用要多少
  • 昆山网站建设昆山html5制作手机端页面
  • 做网站的国标有哪些达州网络推广
  • 站内seo和站外seo区别wordpress演示数据
  • 建设旅游网站财务分析创意设计公司网站
  • 张家港网站优化wordpress调用图片上传
  • 做网站要商标吗房产网站 设计方案
  • 做网站的费用怎么做账客户案例 网站建设
  • 怎么查询网站的备案号城乡建设杂志网站
  • 婚恋网站哪家做的最好北斗导航2022最新版手机版
  • 别墅效果图网站重庆金融公司网站建设
  • 中兴能源建设有限公司网站企业营销策划及推广
  • 外贸英文网站制作WordPress对接微信公众号
  • 推广网站建设花费得多少钱哪些平台可以发布软文
  • wordpress网站检测购物app大全
  • 遵义建设厅官方网站 元丰兰州网站设计有限公司
  • 芜湖做网站的公司排名贵阳好的网站建设公司
  • 网站建设 骏域网站建设专家最有效的15个营销方法
  • 大连品牌官网建站为什么有些网站更新的信息看不到
  • 富阳市网站域名申请好了怎么做网站
  • 做药物分析必须知道的网站网站攻击一般有那些
  • 一般网站做哪些端口映射那个网站做境外自由行便宜
  • 网站的建站过程公司seo是什么意思
  • 胜利油田局域网主页入口seo自学网官网