免费网站建设案例,郑州seo联系搜点网络效果好,旅游网站开发的国内外现状,记事本做网站的代码一、说明
衡量网站流量一个最简单的指标#xff0c;就是网站的页面浏览量#xff08;Page View#xff0c;PV#xff09;。用户每次打开一个页面便记录1次PV#xff0c;多次打开同一页面则浏览量累计。 一般来说#xff0c;PV与来访者的数量成正比#xff0c;但是PV并不…一、说明
衡量网站流量一个最简单的指标就是网站的页面浏览量Page ViewPV。用户每次打开一个页面便记录1次PV多次打开同一页面则浏览量累计。 一般来说PV与来访者的数量成正比但是PV并不直接决定页面的真实来访者数量如同一个来访者通过不断的刷新页面也可以制造出非常高的PV。接下来我们就用Flink算子来实现PV的统计。
二、测试数据准备
把数据文件 UserBehavior 复制到project的input目录下 用于封装数据的JavaBean类
package com.atguigu.flink.java.chapter_6;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** Author lizhenchaoatguigu.cn* Date 2020/12/10 19:32*/
Data
NoArgsConstructor
AllArgsConstructor
public class UserBehavior {private Long userId;private Long itemId;private Integer categoryId;private String behavior;private Long timestamp;
}三、代码
pv实现思路1: WordCount
package com.lyh.flink06;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class PVcount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.readTextFile(input/UserBehavior.csv).map(line - { // 对数据切割, 然后封装到POJO中String[] split line.split(,);return new UserBehavior(Long.valueOf(split[0]),Long.valueOf(split[1]),Integer.valueOf(split[2]),String.valueOf(split[3]),Long.valueOf(split[4]));}).filter(behavior - pv.equals(behavior.getBehavior())) //过滤出pv行为.map(behavior - Tuple2.of(pv, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG)) // 使用Tuple类型, 方便后面求和.keyBy(value - value.f0) // keyBy: 按照key分组.sum(1) // 求和.print();env.execute();}
}pv实现思路2: process
package com.lyh.flink06;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;public class PVprocess {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.readTextFile(input/UserBehavior.csv).map(line - {String[] split line.split(,);return new UserBehavior(Long.valueOf(split[0]),Long.valueOf(split[1]),Integer.valueOf(split[2]),String.valueOf(split[3]),Long.valueOf(split[4]));}).filter(behavior - pv.equals(behavior.getBehavior())).keyBy(UserBehavior::getBehavior).process(new KeyedProcessFunctionString, UserBehavior, Long() {long count 0;Overridepublic void processElement(UserBehavior userBehavior,Context ctx,CollectorLong out) throws Exception {count;out.collect(count);}}).print();env.execute();}
}四、运行结果