随州网站推广,专业网站优化方案,市场营销策划报告,给微商做网站系列文章目录
Flink项目实战篇 基于Flink的城市交通监控平台#xff08;上#xff09; Flink项目实战篇 基于Flink的城市交通监控平台#xff08;下#xff09; 文章目录 系列文章目录1. 项目整体介绍1.1 项目架构1.2 项目数据流1.3 项目主要模块 2. 项目数据字典2.1 卡口…系列文章目录
Flink项目实战篇 基于Flink的城市交通监控平台上 Flink项目实战篇 基于Flink的城市交通监控平台下 文章目录 系列文章目录1. 项目整体介绍1.1 项目架构1.2 项目数据流1.3 项目主要模块 2. 项目数据字典2.1 卡口车辆采集数据2.2 城市交通管理数据表2.3 车辆轨迹数据表 3. 实时卡口监控分析3.1 创建Maven项目3.2 准备数据3.3 实时车辆超速监控3.4 实时卡口拥堵情况监控3.5 实时最通畅的TopN卡口 1. 项目整体介绍
近几年来随着国内经济的快速发展高速公路建设步伐不断加快全国机动车辆、驾驶员数量迅速增长交通管理工作日益繁重压力与日俱增。为了提高公安交通管理工作的科学化、现代化水平缓解警力不足加强和保障道路交通的安全、有序和畅通减少道路交通违法和事故的发生全国各地建设和使用了大量的“电子警察”、“高清卡口”、“固定式测速”、“区间测速”、“便携式测速”、“视频监控”、“预警系统”、“能见度天气监测系统”、“LED信息发布系统”等交通监控系统设备。尽管修建了大量的交通设施增加了诸多前端监控设备但交通拥挤阻塞、交通安全状况仍然十分严重。由于道路上交通监测设备种类和生产厂家繁多目前还没有一个统一的数据采集和交换标准无法对所有的设备、数据进行统一、高效的管理和应用造成各种设备和管理软件混用的局面给使用单位带来了很多不便使得国家大量的基础建设投资未达到预期的效果。各交警支队的设备大都采用本地的数据库管理交警总队无法看到各支队的监测设备及监测信息严重影响对全省交通监测的宏观管理目前网络状况为设备专网、互联网、公安网并存的复杂情况需要充分考虑公安网的安全性同时要保证数据的集中式管理监控数据需要与“六合一”平台、全国机动车稽查布控系统等的数据对接迫切需要一个全盘考虑面向交警交通行业的智慧交通管控指挥平台系统。
智慧交通管控指挥平台建成后达到了以下效果目标
交通监视和疏导通过系统将监视区域内的现场图像传回指挥中心使管理人员直接掌握车辆排队、堵塞、信号灯等交通状况及时调整信号配时或通过其他手段来疏导交通改变交通流的分布以达到缓解交通堵塞的目的。交通警卫通过突发事件的跟踪提高处置突发事件的能力。建立公路事故、事件预警系统的指标体系及多类分析预警模型实现对高速公路通行环境、交通运输对象、交通运输行为的综合分析和预警建立真正意义上的分析及预警体系。及时准确地掌握所监视路口、路段周围的车辆、行人的流量、交通治安情况等为指挥人员提供迅速直观的信息从而对交通事故和交通堵塞做出准确判断并及时响应。收集、处理各类公路网动静态交通安全信息分析研判交通安全态势和事故隐患并进行可视化展示和预警提示。提供接口与其他平台信息共享和关联应用基于各类动静态信息的大数据分析处理实现交通违法信息的互联互通、源头监管等功能。
1.1 项目架构
本项目是与公安交通管理综合应用平台、机动车缉查布控系统等对接的并且基于交通部门现有的数据平台上进行的数据实时分析项目。 卡口道路上用于监控的某个点可能是十字路口也可能是高速出口等。 通道每个卡口上有多个摄像头每个摄像头有拍摄的方向。这些摄像头也叫通道。 “违法王“车辆 该车辆违法未处理超过50次以上的车。 摄像头拍照识别 1一次拍照识别经过卡口摄像头进行的识别识别对象的车辆号牌信息、车辆号牌颜色信息等基于车辆号牌和车辆颜色信息能够实现基本的违法行为辨识、车辆黑白名单比对报警等功能。 2二次拍照识别可以通过时间差和距离自动计算出车辆的速度。
1.2 项目数据流 实时处理流程如下 http请求 --数据采集接口–数据目录– flume监控目录[监控的目录下的文件是按照日期分的] --Kafka --Flink分析数据 -- Mysql[实时监控数据保存]
1.3 项目主要模块 2. 项目数据字典
2.1 卡口车辆采集数据
卡口数据通过Flume采集过来之后存入Kafka中其中数据的格式为
(action_time long --摄像头拍摄时间戳精确到秒, monitor_id string --卡口号, camera_id string --摄像头编号, car string --车牌号码, speed double --通过卡扣的速度, road_id string --道路id, area_id string --区域id,
)其中每个字段之间使用逗号隔开。 区域ID代表一个城市的行政区域。 摄像头编号一个卡口往往会有多个摄像头每个摄像头都有一个唯一编号。 道路ID城市中每一条道路都有名字比如蔡锷路。交通部门会给蔡锷路一个唯一编号。
2.2 城市交通管理数据表
Mysql数据库中有两张表是由城市交通管理平台提供的本项目需要读取这两张表的数据来进行分析计算。 1城市区域表 t_area_info
DROP TABLE IF EXISTS t_area_info;
CREATE TABLE area_info (area_id varchar(255) DEFAULT NULL,area_name varchar(255) DEFAULT NULL
)
--导入数据
INSERT INTO t_area_info VALUES (01, 海淀区);
INSERT INTO t_area_info VALUES (02, 昌平区);
INSERT INTO t_area_info VALUES (03, 朝阳区);
INSERT INTO t_area_info VALUES (04, 顺义区);
INSERT INTO t_area_info VALUES (05, 西城区);
INSERT INTO t_area_info VALUES (06, 东城区);
INSERT INTO t_area_info VALUES (07, 大兴区);
INSERT INTO t_area_info VALUES (08, 石景山);2城市“违法”车辆列表 城市“违法”车辆一般是指需要进行实时布控的违法车辆。
DROP TABLE IF EXISTS t_violation_list;
CREATE TABLE t_violation_list (id int(11) NOT NULL AUTO_INCREMENT,car varchar(255) DEFAULT NULL,violation varchar(1000) DEFAULT NULL,create_time bigint(20) DEFAULT NULL,detail varchar(1000) DEFAULT NULL,PRIMARY KEY (id)
) ENGINEInnoDB DEFAULT CHARSETutf8;3城市卡口限速信息表 城市中有些卡口有限制设置一般超过当前限速的10%要扣分。
DROP TABLE IF EXISTS t_monitor_info;
CREATE TABLE t_monitor_info (area_id varchar(255) DEFAULT NULL,road_id varchar(255) NOT NULL,monitor_id varchar(255) NOT NULL,speed_limit int(11) DEFAULT NULL,PRIMARY KEY (area_id,road_id,monitor_id)
) ENGINEInnoDB DEFAULT CHARSETutf8;
--导入数据
INSERT INTO t_monitor_info VALUES (01,10,0000,60);
INSERT INTO t_monitor_info VALUES (02,11,0001,60);
INSERT INTO t_monitor_info VALUES (01,12,0002,80);
INSERT INTO t_monitor_info VALUES (03,13,0003,100);2.3 车辆轨迹数据表
在智能车辆布控模块中需要保存一些车辆的实时行驶轨迹为了方便其他部门和项目方便查询获取我们在Mysql数据库设计一张车辆实时轨迹表。如果数据量太多需要设置在HBase中。
DROP TABLE IF EXISTS t_track_info;
CREATE TABLE t_track_info (id int(11) NOT NULL AUTO_INCREMENT,car varchar(255) DEFAULT NULL,action_time bigint(20) DEFAULT NULL,monitor_id varchar(255) DEFAULT NULL,road_id varchar(255) DEFAULT NULL,area_id varchar(255) DEFAULT NULL,speed double DEFAULT NULL,PRIMARY KEY (id)
) ENGINEInnoDB DEFAULT CHARSETutf8;3. 实时卡口监控分析
首先要实现的是实时卡口监控分析由于前面课程项目中已经讲解了数据的ETL本项目我们省略数据采集等ETL操作。我们将读取Kafka中的数据集来进行分析。 项目主体用Scala编写采用IDEA作为开发环境进行项目编写采用maven作为项目构建和管理工具。首先我们需要搭建项目框架。
3.1 创建Maven项目
打开IDEA创建一个maven项目我们整个项目需要的工具的不同版本可能会对程序运行造成影响所以应该在porm.xml文件的最上面声明所有工具的版本信息。
在pom.xml中加入以下配置
propertiesflink.version1.9.1/flink.versionscala.binary.version2.11/scala.binary.versionkafka.version0.11.0.0/kafka.version
/properties1添加项目依赖 对于整个项目而言所有模块都会用到flink相关的组件添加Flink相关组件依赖
dependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-scala_${scala.binary.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupId artifactIdflink-streaming-scala_${scala.binary.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.kafka/groupIdartifactIdkafka_${scala.binary.version}/artifactIdversion${kafka.version}/version/dependencydependencygroupIdorg.apache.flink/groupId artifactIdflink-connector-kafka_${scala.binary.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdredis.clients/groupIdartifactIdjedis/artifactIdversion2.8.1/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-cep-scala_${scala.binary.version}/artifactIdversion${flink.version}/version/dependency
/dependencies2添加Scala和打包插件
build
plugins!-- 该插件用于将Scala代码编译成class文件 --plugingroupIdnet.alchim31.maven/groupIdartifactIdscala-maven-plugin/artifactIdversion3.4.6/versionexecutionsexecution!-- 声明绑定到maven的compile阶段 --goalsgoaltestCompile/goal/goals/execution/executions/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-assembly-plugin/artifactIdversion3.0.0/versionconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/plugin
/plugins
/build3.2 准备数据
由于在前面的课程中已经学过数据的采集和ETL本项目不再赘述现在我们直接随机生成数据到文件中方便测试同时也写入Kafka。
项目中模拟车辆速度数据和车辆经过卡扣个数使用到了高斯分布高斯分布就是正态分布。“正态分布”(Normal Distribution)可以描述所有常见的事物和现象正常人群的身高、体重、考试成绩、家庭收入等等。这里的描述是什么意思呢就是说这些指标背后的数据都会呈现一种中间密集、两边稀疏的特征。以身高为例服从正态分布意味着大部分人的身高都会在人群的平均身高上下波动特别矮和特别高的都比较少见正态分布非常常见。 基于以上所以需要在pom.xml中导入高斯分布需要的依赖包
dependencygroupIdorg.apache.commons/groupIdartifactIdcommons-math3/artifactIdversion3.6.1/version
/dependency生成高斯标准分布的代码如下
//获取随机数生成器
val generator: JDKRandomGenerator new JDKRandomGenerator()
//随机生成高斯分布的数据
val grg: GaussianRandomGenerator new GaussianRandomGenerator(generator)
//获取标准正态分布的数据
println(s随机生成数据为${grg.nextNormalizedDouble()})模拟生成数据的代码如下
/*** 模拟生成数据,这里将数据生产到Kafka中同时生成到文件中*/
object GeneratorData {def main(args: Array[String]): Unit {//创建文件流val pw new PrintWriter(./data/traffic_data)//创建Kafka 连接propertiesval props new Properties()props.setProperty(bootstrap.servers,mynode1:9092,mynode2:9092,mynode3:9092)props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer)props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer)val random new Random()//创建Kafka produerval producer new KafkaProducer[String,String](props)//车牌号使用的地区val locations Array[String](京,津,京,鲁,京,京,冀,京,京,粤,京,京)//模拟车辆个数这里假设每日有30万辆车信息for(i - 1 to 30000){//模拟每辆车的车牌号,%05d.format(100000) %05dd代表数字5d代表数字长度为5位不足位数前面补0 。 例如京A88888val car locations(random.nextInt(12))(65random.nextInt(26)).toChar%05d.format(random.nextInt(100000))//模拟车辆经过的卡扣数,使用高斯分布假设正常每辆车每日经过卡扣有30个val generator new GaussianRandomGenerator(new JDKRandomGenerator())val monitorThreshold: Int 1(generator.nextNormalizedDouble()*30).abs.toInt //generator.nextNormalizedDouble() 处于-1 ~ 1 之间//模拟拍摄时间val day DateUtils.getTodayDate()var hour DateUtils.getHour()var flag 0for(j - 1 to monitorThreshold){flag1//模拟monitor_id ,4位长度val monitorId %04d.format(random.nextInt(9))//模拟camear_id ,5为长度val camearId %05d.format(random.nextInt(100000))//模拟road_id ,2为长度val roadId %02d.format(random.nextInt(50))//模拟area_id ,2为长度val areaId %02d.format(random.nextInt(8))//模拟速度 使用高斯分布,速度大多位于90 左右val speed %.1f.format(60 (generator.nextNormalizedDouble()*30).abs)//模拟action_timeif(flag % 30 0 flag ! 0 ){hour (hour.toInt1).toString}val currentTime day hour:DateUtils.getMinutesOrSeconds():DateUtils.getMinutesOrSeconds()//获取action_time 时间戳val actionTime: Long DateUtils.getTimeStamp(currentTime)var oneInfo s$actionTime,$monitorId,$camearId,$car,$speed,$roadId,$areaIdprintln(soneInfo $oneInfo)//写入文件pw.write(oneInfo)pw.println()//写入kafka:producer.send(new ProducerRecord[String,String](traffic-topic,oneInfo))}}pw.flush()pw.close()producer.close()}
}3.3 实时车辆超速监控
在城市交通管理数据库中存储了每个卡口的限速信息但是不是所有卡口都有限速信息其中有一些卡口有限制。Flink中有广播状态流,JobManger统一管理TaskManger中正在运行的Task不可以修改这个广播状态。只能定时更新自定义Source。
我们通过实时计算需要把所有超速超过10%的车辆找出来并写入关系型数据库中。超速结果表如下
DROP TABLE IF EXISTS t_speeding_info;
CREATE TABLE t_speeding_info (id int(11) NOT NULL AUTO_INCREMENT,car varchar(255) NOT NULL,monitor_id varchar(255) DEFAULT NULL,road_id varchar(255) DEFAULT NULL,real_speed double DEFAULT NULL,limit_speed int(11) DEFAULT NULL,
action_time bigint(20) DEFAULT NULL,PRIMARY KEY (id)
) ENGINEInnoDB DEFAULT CHARSETutf8;在当前需求中需要不定时的从数据库表中查询所有限速的卡口再根据限速的卡口列表来实时的判断是否存在超速的车辆如果找到超速的车辆把这些车辆超速的信息保存到Mysql数据库的超速违章记录表中t_speeding_info。
我们把查询限速卡口列表数据作为一个事件流车辆通行日志数据作为第二个事件流。广播状态可以用于通过一个特定的方式来组合并共同处理两个事件流。第一个流的事件被广播到另一个operator的所有并发实例这些事件将被保存为状态。另一个流的事件不会被广播而是发送给同一个operator的各个实例并与广播流的事件一起处理。广播状态非常适合两个流中一个吞吐大一个吞吐小或者需要动态修改处理逻辑的情况。
我们对两个流使用了connect()方法并在连接之后调用BroadcastProcessFunction接口处理两个流
processBroadcastElement()方法每次收到广播流的记录时会调用。将接收到的卡口限速记录放入广播状态中processElement()方法接受到车辆通行日志流的每条消息时会调用。并能够对广播状态进行只读操作以防止导致跨越类中多个并发实例的不同广播状态的修改。
代码如下
/*** 监控超速的车辆信息* 思路从mysql中读取卡扣下的限速信息通过广播流进行广播然后与从kafka中读取的车流量监控事件流进行connect处理* 广播状态操作步骤* 1).读取广播流的DStream数据* 2).将以上DStream数据广播出去* 3).主流与广播流进行Connect关联调用 process 底层API处理* 4).实现process方法中 BroadcastProcessFunction 类下的两个方法进行数据处理*/
object OutOfSpeedMonitor {def main(args: Array[String]): Unit {val env: StreamExecutionEnvironment StreamExecutionEnvironment.getExecutionEnvironment//导入隐式转换import org.apache.flink.streaming.api.scala._env.setParallelism(1)val props new Properties()props.setProperty(bootstrap.servers,mynode1:9092,mynode2:9092,mynode3:9092)props.setProperty(group.id,testgroup1)props.setProperty(key.deserializer,classOf[StringDeserializer].getName)props.setProperty(value.deserializer,classOf[StringDeserializer].getName)props.setProperty(auto.offset.reset,latest)//读取Kafka中的监控车辆事件流val mainDStream: DataStream[TrafficLog] env.addSource(new FlinkKafkaConsumer[String](traffic-topic, new SimpleStringSchema(), props).setStartFromEarliest())
// val mainDStream: DataStream[TrafficLog] env.socketTextStream(mynode5,9999).map(line {val arr: Array[String] line.split(,)TrafficLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toDouble, arr(5), arr(6))})//广播状态流 - 卡扣限速信息val broadCastStream: BroadcastStream[MonitorLimitSpeedInfo] env.addSource(new JdbcReadSource(MonitorLimitSpeedInfo)).map(one {one.asInstanceOf[MonitorLimitSpeedInfo]}).broadcast(GlobalConstant.MONITOR_LIMIT_SPEED_DESCRIPTOR)val outOfSpeedCarInfoDStream: DataStream[OutOfSpeedCarInfo] mainDStream.connect(broadCastStream).process(new BroadcastProcessFunction[TrafficLog, MonitorLimitSpeedInfo, OutOfSpeedCarInfo] {//当有车辆监控事件时会被调用override def processElement(trafficLog: TrafficLog, ctx: BroadcastProcessFunction[TrafficLog, MonitorLimitSpeedInfo, OutOfSpeedCarInfo]#ReadOnlyContext, out: Collector[OutOfSpeedCarInfo]): Unit {//道路_卡扣val roadMonitor trafficLog.roadId_trafficLog.monitorIdval info: MonitorLimitSpeedInfo ctx.getBroadcastState(GlobalConstant.MONITOR_LIMIT_SPEED_DESCRIPTOR).get(roadMonitor)if (info ! null) {//获取当前车辆真实的速度val realSpeed: Double trafficLog.speed//获取当前卡扣限速信息val limitSpeed: Int info.speedLimit//速度超过限速10% 就是超速车辆if (realSpeed limitSpeed * 1.1) {out.collect(OutOfSpeedCarInfo(trafficLog.car, trafficLog.monitorId, trafficLog.roadId, realSpeed, limitSpeed, trafficLog.actionTime))}}}//每次收到广播流数据时都会被调用将接收到的卡扣限速记录放入到广播状态中override def processBroadcastElement(monitorLimitSpeedInfo: MonitorLimitSpeedInfo, ctx: BroadcastProcessFunction[TrafficLog, MonitorLimitSpeedInfo, OutOfSpeedCarInfo]#Context, out: Collector[OutOfSpeedCarInfo]): Unit {val bcState: BroadcastState[String, MonitorLimitSpeedInfo] ctx.getBroadcastState(GlobalConstant.MONITOR_LIMIT_SPEED_DESCRIPTOR)//key : 道路_卡扣 value :monitorLimitSpeedInfobcState.put(monitorLimitSpeedInfo.roadId_monitorLimitSpeedInfo.monitorId, monitorLimitSpeedInfo)}})//将超速车辆的结果保存到 mysql 表 t_speeding_info 中。val sink: JdbcWriteSink[OutOfSpeedCarInfo] new JdbcWriteSink(OutOfSpeedCarInfo)outOfSpeedCarInfoDStream.addSink(sink)env.execute()}
}3.4 实时卡口拥堵情况监控
卡口的实时拥堵情况其实就是通过卡口的车辆平均车速为了统计实时的平均车速这里设定一个滑动窗口窗口长度是为5分钟滑动步长为1分钟。平均车速当前窗口内通过车辆的车速之和 / 当前窗口内通过的车辆数量 并且在Flume采集数据的时候我们发现数据可能出现时间乱序问题最长迟到5秒。
实时卡口平均速度需要保存到Mysql数据库中结果表设计为
DROP TABLE IF EXISTS t_average_speed;
CREATE TABLE t_average_speed (id int(11) NOT NULL AUTO_INCREMENT,start_time bigint(20) DEFAULT NULL,end_time bigint(20) DEFAULT NULL,monitor_id varchar(255) DEFAULT NULL,avg_speed double DEFAULT NULL,car_count int(11) DEFAULT NULL,PRIMARY KEY (id)
) ENGINEInnoDB DEFAULT CHARSETutf8;完整的代码
object MonitorAvgSpeedMonitor {def main(args: Array[String]): Unit {val env: StreamExecutionEnvironment StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.streaming.api.scala._val props new Properties()props.setProperty(bootstrap.servers,mynode1:9092,mynode2:9092,mynode3:9092)props.setProperty(group.id,testgroup2)props.setProperty(key.deserializer,classOf[StringDeserializer].getName)props.setProperty(value.deserializer,classOf[StringDeserializer].getName)//使用时间为 事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//设置线程为1env.setParallelism(1)// val mainDStream: DataStream[TrafficLog] env.addSource(new FlinkKafkaConsumer[String](traffic-topic, new SimpleStringSchema(), props))val mainDStream: DataStream[TrafficLog] env.socketTextStream(mynode5,9999).map(line {val arr: Array[String] line.split(,)val actionTime arr(0).toLongval monitorId arr(1)val cameraId arr(2)val car arr(3)val speed arr(4).toDoubleval roadId arr(5)val areaId arr(6)TrafficLog(actionTime, monitorId, cameraId, car, speed, roadId, areaId)}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TrafficLog](Time.seconds(5)) {override def extractTimestamp(element: TrafficLog): Long element.actionTime})mainDStream.keyBy(_.monitorId).timeWindow(Time.minutes(5),Time.minutes(1))//统计每个卡扣通过车辆数统计每个卡扣下的车辆总速度和使用增量函数.aggregate(new AggregateFunction[TrafficLog,(Int,Double),(Int,Double)] {override def createAccumulator(): (Int, Double) (0,0.0)override def add(value: TrafficLog, accumulator: (Int, Double)): (Int, Double) (accumulator._11,accumulator._2value.speed)override def getResult(accumulator: (Int, Double)): (Int, Double) accumulatoroverride def merge(a: (Int, Double), b: (Int, Double)): (Int, Double) (a._1b._1,a._2b._2)},new ProcessWindowFunction[(Int,Double),MonitorAvgSpeedInfo,String,TimeWindow] {override def process(key: String, context: Context, elements: Iterable[(Int, Double)], out: Collector[MonitorAvgSpeedInfo]): Unit {val monitorId keyval avgSpeed (elements.last._2/elements.last._1).formatted(%.2f).toDoubleout.collect(new MonitorAvgSpeedInfo(context.window.getStart,context.window.getEnd,monitorId,avgSpeed,elements.last._1))}}).addSink(new JdbcWriteSink[MonitorAvgSpeedInfo](MonitorAvgSpeedInfo))env.execute()}3.5 实时最通畅的TopN卡口
所谓的最通畅的卡口其实就是当时的车辆数量最少的卡口。这里有两种实现方式一种是基于上一个功能的基础上再次开启第二个窗口操作然后使用AllWindowFunction实现一个自定义的TopN函数Top来计算车速排名前3名的卡口并将排名结果格式化成字符串便于后续输出。另外一种是使用窗口函数对滑动窗口内的数据全量计算并排序计算。
1基于上个功能基础上完整的代码
/*** 基于 实时卡扣拥堵情况业务 基础之上进行统计*/
object FindTop5MonitorInfo2 {def main(args: Array[String]): Unit {val env: StreamExecutionEnvironment StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.streaming.api.scala._val props new Properties()props.setProperty(bootstrap.servers,mynode1:9092,mynode2:9092,mynode3:9092)props.setProperty(group.id,testgroup2)props.setProperty(key.deserializer,classOf[StringDeserializer].getName)props.setProperty(value.deserializer,classOf[StringDeserializer].getName)//使用时间为 事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//设置线程为1env.setParallelism(1)val mainDStream: DataStream[TrafficLog] env.addSource(new FlinkKafkaConsumer[String](traffic-topic, new SimpleStringSchema(), props).setStartFromEarliest())
// val mainDStream: DataStream[TrafficLog] env.socketTextStream(mynode5,9999).map(line {val arr: Array[String] line.split(,)val actionTime arr(0).toLongval monitorId arr(1)val cameraId arr(2)val car arr(3)val speed arr(4).toDoubleval roadId arr(5)val areaId arr(6)TrafficLog(actionTime, monitorId, cameraId, car, speed, roadId, areaId)}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TrafficLog](Time.seconds(5)) {override def extractTimestamp(element: TrafficLog): Long element.actionTime})val monitorAvgSpeedDStream: DataStream[MonitorAvgSpeedInfo] mainDStream.keyBy(_.monitorId).timeWindow(Time.minutes(5), Time.minutes(1))//统计每个卡扣通过车辆数统计每个卡扣下的车辆总速度和使用增量函数.aggregate(new AggregateFunction[TrafficLog, (Int, Double), (Int, Double)] {override def createAccumulator(): (Int, Double) (0, 0.0)override def add(value: TrafficLog, accumulator: (Int, Double)): (Int, Double) (accumulator._1 1, accumulator._2 value.speed)override def getResult(accumulator: (Int, Double)): (Int, Double) accumulatoroverride def merge(a: (Int, Double), b: (Int, Double)): (Int, Double) (a._1 b._1, a._2 b._2)},new ProcessWindowFunction[(Int, Double), MonitorAvgSpeedInfo, String, TimeWindow] {override def process(key: String, context: Context, elements: Iterable[(Int, Double)], out: Collector[MonitorAvgSpeedInfo]): Unit {val monitorId keyval avgSpeed (elements.last._2 / elements.last._1).formatted(%.2f).toDoubleout.collect(new MonitorAvgSpeedInfo(context.window.getStart, context.window.getEnd, monitorId, avgSpeed, elements.last._1))}}).assignAscendingTimestamps(masi {masi.endTime})//设置下一个窗口的时间//这里设置一个滚动窗口每隔1分钟对以上所有卡扣对应的平均速度进行排序得到对应的结果monitorAvgSpeedDStream.timeWindowAll(Time.minutes(1)).process(new ProcessAllWindowFunction[MonitorAvgSpeedInfo,String,TimeWindow] {override def process(context: Context, elements: Iterable[MonitorAvgSpeedInfo], out: Collector[String]): Unit {val builder new StringBuilder(s窗口起始时间${context.window.getStart} - ${context.window.getEnd},最拥堵的前3个卡扣信息如下)val infoes: List[MonitorAvgSpeedInfo] elements.toList.sortWith((masi1,masi2){masi1.avgSpeed masi2.avgSpeed}).take(3)for(masi - infoes){builder.append(smonitorId : ${masi.monitorId},avgSpeed : ${masi.avgSpeed} |)}out.collect(builder.toString())}}).print()env.execute()}
}2滑动窗口全量计算
object FindTop5MonitorInfo1 {def main(args: Array[String]): Unit {val env: StreamExecutionEnvironment StreamExecutionEnvironment.getExecutionEnvironment//导入隐式转换import org.apache.flink.streaming.api.scala._//设置并行度为1env.setParallelism(1)//设置事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val props new Properties()props.setProperty(bootstrap.servers,mynode1:9092,mynode2:9092,mynode3:9092)props.setProperty(group.id,testgroup3)props.setProperty(key.deserializer,classOf[StringDeserializer].getName)props.setProperty(value.deserializer,classOf[StringDeserializer].getName)val mainDStream: DataStream[TrafficLog] env.addSource(new FlinkKafkaConsumer[String](traffic-topic, new SimpleStringSchema(), props).setStartFromEarliest())
// val mainDStream: DataStream[TrafficLog] env.socketTextStream(mynode5,9999).map(line {val arr: Array[String] line.split(,)TrafficLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toDouble, arr(5), arr(6))}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TrafficLog](Time.seconds(5)) {override def extractTimestamp(element: TrafficLog): Long element.actionTime})mainDStream.timeWindowAll(Time.minutes(1)).aggregate(//返回数据为 Map[String,Double] Map[卡扣平均速度]new AggregateFunction[TrafficLog,Map[String,(Int,Double)],Map[String,Double]]{//初始化一个Map[卡扣(当前卡扣对应总车辆数当前卡扣下所有车辆总速度和)]override def createAccumulator(): Map[String, (Int, Double)] Map()override def add(value: TrafficLog, accMap: Map[String, (Int, Double)]): Map[String, (Int, Double)] {//获取当前一条数据的monitorIDval monitorId: String value.monitorIdif(accMap.contains(monitorId)){//当前map中包含此卡扣accMap.put(monitorId,(accMap.get(monitorId).get._11,accMap.get(monitorId).get._2value.speed))}else{accMap.put(monitorId,(1,value.speed))}accMap}override def getResult(accumulator: Map[String,(Int, Double)]): Map[String, Double] {accumulator.map(tp{val monitorId: String tp._1val totalCarCount: Int tp._2._1val totalSpeed: Double tp._2._2(monitorId,(totalSpeed/totalCarCount).formatted(%.2f).toDouble)})}//合并不同线程处理的数据override def merge(a: Map[String, (Int, Double)], b: Map[String, (Int, Double)]): Map[String, (Int, Double)] {b.foreach(tp{val monitorId: String tp._1val carCount: Int tp._2._1val totalSpeed: Double tp._2._2if(a.contains(monitorId)){//第一个map中包含当前卡扣数据a.put(monitorId,(a.get(monitorId).get._1 carCount,a.get(monitorId).get._2totalSpeed))}else{//第一个map中不包含当前卡扣数据a.put(monitorId,tp._2)}})a}},new AllWindowFunction[Map[String, Double],String,TimeWindow] {override def apply(window: TimeWindow, input: scala.Iterable[mutable.Map[String, Double]], out: Collector[String]): Unit {val tuples: List[(String, Double)] input.last.toList.sortWith((tp1,tp2){tp1._2 tp2._2}).take(3)val returnStr new StringBuilder(s窗口起始时间${window.getStart} - ${window.getEnd} ,最拥堵前3个卡扣信息 )for(tp - tuples){returnStr.append(smonitorId ${tp._1} ,avgSpeed ${tp._2} |)}out.collect(returnStr.toString())}}).print()env.execute()