山东高阳建设公司网站,手机版万能视频提取器,缪斯设计,qq小程序打不开怎么办Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点#xff0c;并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分#xff0c;比如术语、架构、编程模型、编程指南、基本的…Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分和实际的生产应用联系更为密切以及有一定开发难度的内容。 5、Flink 监控系列 本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明一般不会介绍知识点的信息更多的是提供一个一个可以具体使用的示例。本专栏不再分目录通过链接即可看出介绍的内容。
两专栏的所有文章入口点击Flink 系列文章汇总索引 文章目录 Flink 系列文章一、示例按照分组规则进行图形匹配-KeyedBroadcastProcessFunction1、maven依赖2、实现3、验证1、规则输入2、item输入3、控制台输出 本文详细的介绍了通过broadcast state的实现简单的模式匹配其中需要用到KeyedBroadcastProcessFunction。
如果需要了解更多内容可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外没有其他依赖。
一、示例按照分组规则进行图形匹配-KeyedBroadcastProcessFunction
本示例是简单的应用broadcast state实现简单模式匹配即实现 1、按照相同颜色进行分组在相同颜色组中按照规则进行匹配。 2、相同颜色的规则1长方形后是三角形 3、相同颜色的规则2正方形后是长方形
如匹配上述规则1或规则2则输出匹配成功。
1、maven依赖
propertiesencodingUTF-8/encodingproject.build.sourceEncodingUTF-8/project.build.sourceEncodingmaven.compiler.source1.8/maven.compiler.sourcemaven.compiler.target1.8/maven.compiler.targetjava.version1.8/java.versionscala.version2.12/scala.versionflink.version1.17.0/flink.version/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/version!-- scopeprovided/scope --/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-csv/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-json/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress --dependencygroupIdorg.apache.commons/groupIdartifactIdcommons-compress/artifactIdversion1.24.0/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.2/version!-- scopeprovided/scope --/dependency/dependencies2、实现
package org.tablesql.join;import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/** Author: alanchan* * LastEditors: alanchan* * Description: 按照相同颜色进行分组在相同颜色组中按照规则进行匹配。相同颜色的规则1长方形后是三角形规则2正方形后是长方形*/
public class TestJoinDimKeyedBroadcastProcessFunctionDemo {DataNoArgsConstructorAllArgsConstructorstatic class Shape {private String name;private String desc;}DataNoArgsConstructorAllArgsConstructorstatic class Colour {private String name;private Long blue;private Long red;private Long green;}DataNoArgsConstructorAllArgsConstructorstatic class Item {private Shape shape;private Colour color;}DataNoArgsConstructorAllArgsConstructorstatic class Rule {private String name;private Shape first;private Shape second;}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// item 实时流DataStreamItem itemStream env.socketTextStream(192.168.10.42, 9999).map(o - {// 解析item流// 数据结构Item[shape(name,desc);color(name,blue,red,green)]String[] lines o.split(;);String[] shapeString lines[0].split(,);String[] colorString lines[1].split(,);Shape shape new Shape(shapeString[0],shapeString[1]);Colour color new Colour(colorString[0],Long.valueOf(colorString[1]),Long.valueOf(colorString[2]),Long.valueOf(colorString[3]));return new Item(shape,color);});// rule 实时流DataStreamRule ruleStream env.socketTextStream(192.168.10.42, 8888).map(o - {// 解析rule流// 数据结构Rule[name;shape(name,desc);shape(name,desc)]String[] lines o.split(;);String name lines[0];String[] firstShapeString lines[1].split(,);String[] secondShapeString lines[2].split(,);Shape firstShape new Shape(firstShapeString[0],firstShapeString[1]);Shape secondShape new Shape(secondShapeString[0],secondShapeString[1]);return new Rule(name,firstShape,secondShape);}).setParallelism(1);// 将图形使用颜色进行划分KeyedStreamItem, Colour colorPartitionedStream itemStream.keyBy(new KeySelectorItem, Colour() {Overridepublic Colour getKey(Item value) throws Exception {return value.getColor();// 实现分组}});colorPartitionedStream.print(colorPartitionedStream:----);// 一个 map descriptor它描述了用于存储规则名称与规则本身的 map 存储结构MapStateDescriptorString, Rule ruleStateDescriptor new MapStateDescriptor(RulesBroadcastState,BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHintRule() {}));// 将rule定义为广播流广播规则并且创建 broadcast stateBroadcastStreamRule ruleBroadcastStream ruleStream.broadcast(ruleStateDescriptor);// 连接输出流,connect() 方法需要由非广播流来进行调用BroadcastStream 作为参数传入。DataStreamString output colorPartitionedStream.connect(ruleBroadcastStream).process(// KeyedBroadcastProcessFunction 中的类型参数表示// 1. key stream 中的 key 类型// 2. 非广播流中的元素类型// 3. 广播流中的元素类型// 4. 结果的类型在这里是 stringnew KeyedBroadcastProcessFunctionColour, Item, Rule, String() {// 存储部分匹配的结果即匹配了一个元素正在等待第二个元素// 用一个数组来存储因为同时可能有很多第一个元素正在等待private final MapStateDescriptorString, ListItem itemMapStateDesc new MapStateDescriptor(items,BasicTypeInfo.STRING_TYPE_INFO,new ListTypeInfo(Item.class));// 与之前的 ruleStateDescriptor 相同,用于存储规则名称与规则本身的 map 存储结构private final MapStateDescriptorString, Rule ruleStateDescriptor new MapStateDescriptor(RulesBroadcastState,BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHintRule() {}));// 负责处理广播流的元素 Overridepublic void processBroadcastElement(Rule ruleValue,KeyedBroadcastProcessFunctionColour, Item, Rule, String.Context ctx,CollectorString out) throws Exception {// 得到广播流的存储状态ctx.getBroadcastState(MapStateDescriptorK, V stateDescriptor)// 查询元素的时间戳ctx.timestamp()// 查询目前的Watermarkctx.currentWatermark()// 目前的处理时间(processing time)ctx.currentProcessingTime()// 产生旁路输出ctx.output(OutputTagX outputTag, X value) // 在 getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同ctx.getBroadcastState(ruleStateDescriptor).put(ruleValue.getName(), ruleValue);}// 负责处理另一个流的元素Overridepublic void processElement(Item itemValue,KeyedBroadcastProcessFunctionColour, Item, Rule, String.ReadOnlyContext ctx,CollectorString out) throws Exception {final MapStateString, ListItem itemMapState getRuntimeContext().getMapState(itemMapStateDesc);final Shape shape itemValue.getShape();System.out.println(shape:shape);// 在 getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同ReadOnlyBroadcastStateString, Rule readOnlyBroadcastState ctx.getBroadcastState(ruleStateDescriptor);IterableEntryString, Rule iterableRule readOnlyBroadcastState.immutableEntries();for (EntryString, Rule entry : iterableRule) {final String ruleName entry.getKey();final Rule rule entry.getValue();// 初始化ListItem itemStoredList itemMapState.get(ruleName);if (itemStoredList null) {itemStoredList new ArrayList();}// 比较 shape if (shape.getName().equals(rule.second.getName()) !itemStoredList.isEmpty()) {for (Item item : itemStoredList) {// 符合规则收集匹配结果out.collect(匹配成功: item - itemValue);}itemStoredList.clear();}// 规则连续性设置if (shape.getName().equals(rule.first.getName())) {itemStoredList.add(itemValue);}// if (itemStoredList.isEmpty()) {itemMapState.remove(ruleName);} else {itemMapState.put(ruleName, itemStoredList);}}}});output.print(output:-------);env.execute();}}
3、验证
在netcat中启动两个端口分别是8888和99998888输入规则9999输入item然后关键控制台输出。
1、规则输入
red;rectangle,is a rectangle;tripe,is a tripe
green;square,is a square;rectangle,is a rectangle2、item输入
# 匹配成功
rectangle,is a rectangle;red,100,100,100
tripe,is a tripe;red,100,100,100# 匹配成功
square,is square;green,150,150,150
rectangle,is a rectangle;green,150,150,150# 匹配不成功
tripe,is tripe;blue,200,200,200# 匹配成功
rectangle,is a rectangle;blue,100,100,100
tripe,is a tripe;blue,100,100,100# 匹配不成功
tripe,is a tripe;blue,100,100,100
rectangle,is a rectangle;blue,100,100,100
3、控制台输出
colorPartitionedStream:----:9 TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shapeTestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(namerectangle, descis a rectangle), colorTestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(namered, blue100, red100, green100))shape:TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(namerectangle, descis a rectangle)
colorPartitionedStream:----:9 TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shapeTestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(nametripe, descis a tripe), colorTestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(namered, blue100, red100, green100))shape:TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(nametripe, descis a tripe)
output:-------:9 匹配成功: TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shapeTestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(namerectangle, descis a rectangle), colorTestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(namered, blue100, red100, green100)) - TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shapeTestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(nametripe, descis a tripe), colorTestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(namered, blue100, red100, green100))
colorPartitionedStream:----:9 TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shapeTestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(namerectangle, descis a rectangle), colorTestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(namegreen, blue150, red150, green150))
output:-------:9 匹配成功: TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shapeTestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(namesquare, descis square), colorTestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(namegreen, blue150, red150, green150)) - TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shapeTestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(namerectangle, descis a rectangle), colorTestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(namegreen, blue150, red150, green150))
colorPartitionedStream:----:3 TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shapeTestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(nametripe, descis tripe), colorTestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(nameblue, blue200, red200, green200))
colorPartitionedStream:----:1 TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shapeTestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(namerectangle, descis a rectangle), colorTestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(nameblue, blue100, red100, green100))
colorPartitionedStream:----:1 TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shapeTestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(nametripe, descis a tripe), colorTestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(nameblue, blue100, red100, green100))
output:-------:1 匹配成功: TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shapeTestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(namerectangle, descis a rectangle), colorTestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(nameblue, blue100, red100, green100)) - TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shapeTestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(nametripe, descis a tripe), colorTestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(nameblue, blue100, red100, green100))
colorPartitionedStream:----:1 TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shapeTestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(nametripe, descis a tripe), colorTestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(nameblue, blue100, red100, green100))
colorPartitionedStream:----:1 TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shapeTestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(namerectangle, descis a rectangle), colorTestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(nameblue, blue100, red100, green100))
以上本文详细的介绍了通过broadcast state的实现简单的模式匹配其中需要用到KeyedBroadcastProcessFunction。