深圳市设计网站,建筑信息网,网站开发网络结构图,seo优化培训班一、目的
在实际项目的开发过程中#xff0c;不同Kafka主题的数据规模、数据频率#xff0c;需要配置不同的Flume参数#xff0c;而这一切的调试、配置工作#xff0c;都要建立在对Flume配置文件各参数含义的基础上
二、Flume各参数及其含义
#xff08;一#xff09;…一、目的
在实际项目的开发过程中不同Kafka主题的数据规模、数据频率需要配置不同的Flume参数而这一切的调试、配置工作都要建立在对Flume配置文件各参数含义的基础上
二、Flume各参数及其含义
一filePrefix
1、含义写入hdfs的文件名前缀,可以使用flume提供的日期及%{host}表达式
2、默认值为FlumeData
二fileSuffix
1、含义写入hdfs的文件名后缀,比如.lzo .log .txt
三inUsePrefix(一般不用管)
1、含义临时文件的文件名前缀,hdfs sink会先往目标目录中写临时文件,再根据相关规则重命名成最终目标文件
四inUseSuffix(一般不用管)
1、含义临时文件的文件名后缀
2、默认值:.tmp
五rollInterval
1、含义hdfs sink间隔多长将临时文件滚动成最终目标文件,单位:秒
2、默认值:30s
3、如果设置成0,则表示不根据时间来滚动文件
4、注意:滚动roll指的是,hdfs sink将临时文件重命名成最终目标文件,并新打开一个临时文件来写入数据
六rollSize
1、含义当临时文件达到该大小(单位:bytes)时,滚动成目标文件
2、默认值:1024byte
3、 如果设置成0,则表示不根据临时文件大小来滚动文件
七rollCount
1、含义当events数据达到该数量时候,将临时文件滚动成目标文件
2、默认值:10
3、如果设置成0,则表示不根据events数据来滚动文件
八idleTimeout
1、含义当目前被打开的临时文件在该参数指定的时间秒内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件
2、默认值:0
九batchSize
1、含义每个批次刷新到HDFS上的events数量
2、默认值:100
十codeC
1、含义文件压缩格式,包括:gzip, bzip2, lzo, lzop, snappy
十一fileType
1、含义文件格式,包括:SequenceFile, DataStream,CompressedStream;默认值:SequenceFile
2、当使用DataStream时候,文件不会被压缩,不需要设置hdfs.codeC
3、当使用CompressedStream时候,必须设置一个正确的hdfs.codeC值
十二maxOpenFiles
1、含义最大允许打开的HDFS文件数,当打开的文件数达到该值,最早打开的文件将会被关闭
2、默认值:5000
十三minBlockReplicas
1、含义写入HDFS文件块的最小副本数
2、默认值:HDFS副本数(一般不修改,HDFS副本数默认为3)
3、 该参数会影响文件的滚动配置,一般将该参数配置成1,才可以按照配置正确滚动文件
十四writeFormat
1、含义写sequence文件的格式
2、包含:Text, Writable默认
十五callTimeout
1、含义执行HDFS操作的超时时间(单位:毫秒)
2、默认值:10000 (10s)
十六threadsPoolSize
1、含义hdfs sink启动的操作HDFS的线程数
2、默认值:10
十七rollTimerPoolSize
1、含义hdfs sink启动的根据时间滚动文件的线程数
2、默认值:1
十八kerberosPrincipal
1、含义HDFS安全认证kerberos配置
十九kerberosKeytab
1、含义HDFS安全认证kerberos配置
二十proxyUser
1、含义代理用户
二十一channel
1、含义管道
2、一个sink只能有一个管道,但一根管道可以有多个sink
二十二type
1、含义类型
2、Source类型 3、Sink类型 4、Channel类型 二十三path
1、含义写入hdfs的路径,需要包含文件系统标识,比如:hdfs://hurys23:8020/user/hive/warehouse/hurys_dc_ods.db/ods_turnratio/day%Y-%m-%d/
2、可以使用flume提供的日期及%{host}表达式
二十四round
1、含义是否启用时间上的”舍弃”这里的”舍弃”类似于”四舍五入”
2、如果启用则会影响除了%t的其他所有时间表达式
3、默认值false
二十五roundValue
1、含义时间上进行“舍弃”的值
2、默认值1
二十六roundUnit
1、含义时间上进行”舍弃”的单位包含second,minute,hour
2、默认值seconds
二十七timeZone
1、含义时区
2、默认值Local Time
二十八useLocalTimeStamp
1、含义是否使用当地时间
2、默认值flase
二十九closeTries
1、含义hdfs sink关闭文件的尝试次数
2、默认值0
3、如果设置为1当一次关闭文件失败后hdfs sink将不会再次尝试关闭文件这个未关闭的文件将会一直留在那并且是打开状态
4、如果设置为0当一次关闭失败后hdfs sink会继续尝试下一次关闭直到成功
三十retryInterval
1、含义hdfs sink尝试关闭文件的时间间隔
2、如果设置为0表示不尝试相当于于将hdfs.closeTries设置成1
3、默认值180秒
三十一serializer
1、含义序列化类型
2、其他还有avro_event或者是实现了EventSerializer.Builder的类名
3、默认值TEXT
三十二设置3个round相关参数用来控制多久生成一个文件 #是否按照时间滚动文件夹 a1.sinks.k1.hdfs.round true #多少时间单位创建一个新的文件夹 a1.sinks.k1.hdfs.roundValue 10 #重新定义时间单位 a1.sinks.k1.hdfs.roundUnit second
三十三设置时间戳、刷新频率以及文件类型 #是否使用本地时间戳 a1.sinks.k1.hdfs.useLocalTimeStamp true #积攒多少个 Event 才 flush 到 HDFS 一次 a1.sinks.k1.hdfs.batchSize 100 #设置文件类型可支持压缩 a1.sinks.k1.hdfs.fileType DataStream
三十四设置下方三个条件任意一个达到都会生成一个新的文件 #多久生成一个新的文件 a1.sinks.k1.hdfs.rollInterval 60 #设置每个文件的滚动大小大概是 1M a1.sinks.k1.hdfs.rollSize 1024000 #文件的滚动与 Event 数量无关 a1.sinks.k1.hdfs.rollCount 0
三、Flume配置文件案例
### Name agent, source, channels and sink alias a1.sources s1 a1.channels c1 a1.sinks k1
### define kafka source a1.sources.s1.type org.apache.flume.source.kafka.KafkaSource
# Maximum number of messages written to Channel in one batch a1.sources.s1.batchSize 5000
# Maximum time (in ms) before a batch will be written to Channel The batch will be written whenever the first of size and time will be reached. a1.sources.s1.batchDurationMillis 2000
# set kafka broker address a1.sources.s1.kafka.bootstrap.servers 192.168.0.27:9092
# set kafka consumer group Id and offset consume # 官网推荐1.9.0版本只设置了topic但测试后不能正常消费需要添加消费组id自己写一个并定义偏移量消费方式 a1.sources.s1.kafka.consumer.group.id evaluation_group a1.sources.s1.kafka.consumer.auto.offset.reset earliest
# set kafka topic a1.sources.s1.kafka.topics topic_b_evaluation ### defind hdfs sink
a1.sinks.k1.type hdfs a1.sinks.k1.hdfs.path hdfs://hurys23:8020/user/hive/warehouse/hurys_dc_ods.db/ods_queue/day%Y-%m-%d/ a1.sinks.k1.hdfs.filePrefix queue a1.sinks.k1.hdfs.fileSuffix .log a1.sinks.k1.hdfs.round true a1.sinks.k1.hdfs.roundValue 10 a1.sinks.k1.hdfs.roundUnit second a1.sinks.k1.hdfs.rollSize 10240000 a1.sinks.k1.hdfs.rollCount 0 a1.sinks.k1.hdfs.rollInterval 0 a1.sinks.k1.hdfs.idleTimeout 60 a1.sinks.k1.hdfs.minBlockReplicas 1 ### define channel from kafka source to hdfs sink # memoryChannel快速但是当设备断电数据会丢失 # FileChannel速度较慢即使设备断电数据也不会丢失 a1.channels.c1.type file # 这里不单独设置checkpointDir和dataDirs文件位置参考官网不设置会有默认位置 # channel store size a1.channels.c1.capacity 100000 # transaction size a1.channels.c1.transactionCapacity 10000 ### 绑定source、channel和sink a1.sources.s1.channels c1 a1.sinks.k1.channel c1