广州市官网网站建设报价,铜陵网站建设,做网站的公司叫什么名字好,网站行业认证怎么做ReadFromKafka组件
组件说明
从kafka中读取数据。
计算引擎
flink
有界性
Unbounded
组件分组
kafka
端口
Inport#xff1a;默认端口
outport#xff1a;默认端口
组件属性
名称展示名称默认值允许值是否必填描述例子kafka_hostKAFKA_HOST“”无是逗号分隔的Ka…ReadFromKafka组件
组件说明
从kafka中读取数据。
计算引擎
flink
有界性
Unbounded
组件分组
kafka
端口
Inport默认端口
outport默认端口
组件属性
名称展示名称默认值允许值是否必填描述例子kafka_hostKAFKA_HOST“”无是逗号分隔的Kafka broker列表。127.0.0.1:9092topicTOPIC“”无否读取数据的topic名。亦支持用分号间隔的topic列表如 ‘topic-1;topic-2’。 注意‘topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。topic-1topic_patternTOPIC_PATTERN“”无否匹配读取topic名称的正则表达式。在作业开始运行时所有匹配该正则表达式的topic都将被Kafka consumer订阅。注意‘topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。topic1_*startup_modeSTARTUP_MODE“”Set(“earliest-offset”, “latest-offset”, “group-offsets”, “timestamp”, “specific-offsets”)否Kafka consumer 的启动模式。earliest-offsetschemaSCHEMA“”无否Kafka消息的schema信息。id:int,name:string,age:intformatFORMAT“”Set(“json”, “csv”, “avro”, “parquet”, “orc”, “raw”, “protobuf”,“debezium-json”, “canal-json”, “maxwell-json”, “ogg-json”)是用来反序列化Kafka消息的格式。注意该配置项和 ‘value.format’ 二者必需其一。jsongroupGROUP“”无否Kafka source的消费组id。如果未指定消费组ID则会使用自动生成的KafkaSource-{tableIdentifier}作为消费组ID。group_1propertiesPROPERTIES“”无否Kafka source连接器其他配置
ReadFromKafka示例配置
{flow: {name: DataGenTest,uuid: 1234,stops: [{uuid: 0000,name: DataGen1,bundle: cn.piflow.bundle.flink.common.DataGen,properties: {schema: [{\filedName\:\id\,\filedType\:\INT\,\kind\:\sequence\,\start\:1,\end\:10000},{\filedName\:\name\,\filedType\:\STRING\,\kind\:\random\,\length\:15},{\filedName\:\age\,\filedType\:\INT\,\kind\:\random\,\max\:100,\min\:1}],count: 100,ratio: 5}},{uuid: 1111,name: WriteToKafka1,bundle: cn.piflow.bundle.flink.kafka.WriteToKafka,properties: {kafka_host: hadoop01:9092,topic: test,schema: ,format: json,properties: {}}},{uuid: 2222,name: ReadFromKafka1,bundle: cn.piflow.bundle.flink.kafka.ReadFromKafka,properties: {kafka_host: hadoop01:9092,topic: test,group: test,startup_mode: earliest-offset,schema: id:int,name:string,age:int,format: json,properties: {}}},{uuid: 3333,name: ShowData1,bundle: cn.piflow.bundle.flink.common.ShowData,properties: {showNumber: 5000}}],paths: [{from: DataGen1,outport: ,inport: ,to: WriteToKafka1},{from: WriteToKafka1,outport: ,inport: ,to: ReadFromKafka1},{from: ReadFromKafka1,outport: ,inport: ,to: ShowData1}]}
}示例说明
本示例演示了通过DataGen组件生成idnameage3个字段100条数据每秒生成5条数据通过WriteToKafka组件将数据写入到kafka的test topic中然后通过ReadFromKafka组件从test topic中读取数据最后使用ShowData组件将数据打印在控制台。
字段描述
[{ filedName: id,filedType: INT,kind: sequence,start: 1,end: 10000},{ filedName: name,filedType: STRING,kind: random,length: 15},{ filedName: age,filedType: INT,kind: random,max: 100,min: 1}
]1.id字段
id字段类型为INT,使用sequence生成器序列生成器的起始值为1结束值为10000.
2.name字段
name字段类型为STRING,使用random生成器生成字符长度为15。
3.age字段
age字段类型为INT使用random生成器随机生成器的最小值为1最大值为100。