新区seo整站优化公司,网站服务器租用价格表,免费网站建设,佛山外贸网站建设报价1 基础环境简介
linux系统#xff1a;centos#xff0c;前置安装#xff1a;jdk、hadoop、zookeeper、kafka#xff0c;版本如下
软件版本描述centos7linux系统发行版jdk1.8java开发工具集hadoop2.10.0大数据生态基础组件zookeeper3.5.7分布式应用程序协调服务kafka3.0分…1 基础环境简介
linux系统centos前置安装jdk、hadoop、zookeeper、kafka版本如下
软件版本描述centos7linux系统发行版jdk1.8java开发工具集hadoop2.10.0大数据生态基础组件zookeeper3.5.7分布式应用程序协调服务kafka3.0分布式mq组件flume1.9.0分布式采集传输组件
2 报错 场景1动态监控目录多个日志变化通过flume采集传输到kafka 报错日志 org.apache.flume.FlumeException: Error creating positionFile parent directoriesat org.apache.flume.source.taildir.TaildirSource.configure(TaildirSource.java:170)at org.apache.flume.conf.Configurables.configure(Configurables.java:41)at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:325)at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:105)at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:750)
Caused by: java.nio.file.FileAlreadyExistsException: /export/server/flumeat sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)at sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)at java.nio.file.Files.createDirectory(Files.java:674)at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781)at java.nio.file.Files.createDirectories(Files.java:727)at org.apache.flume.source.taildir.TaildirSource.configure(TaildirSource.java:168)... 11 more conf文件如下 #定义组件
a1.sources r1
a1.channels c1#配置source
a1.sources.r1.type TAILDIR
a1.sources.r1.filegroups f1
a1.sources.r1.filegroups.f1 /opt/module/applog/log/app.*
a1.sources.r1.positionFile /export/server/flume/taildir_position.json#配置channel
a1.channels.c1.type org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers node1:9092,node2:9092
a1.channels.c1.kafka.topic topic_log01
a1.channels.c1.parseAsFlumeEvent false#组装
a1.sources.r1.channels c1原因就是在创建positionFile的时候父目录已存在 场景2我们生成的日志文件app.log 每经过一天会按照日期重命名文件然后生成新的app.log,此时flume会重新采集所有的日志信息导致信息重复采集2次。 Taildir 说明 Taildir Source 维护了一个 json 格式的 position File其会定期的往 position File中更新每个文件读取到的最新的位置因此能够实现断点续传。Position File 的格式如下 {inode:2496272,pos:12,file:/opt/module/flume/files/file1.txt}{inode:2496275,pos:12,file:/opt/module/flume/files2/log.txt}而flume会同时判断Inode和file来确定是否同一文件 注Linux 中储存文件元数据的区域就叫做 inode每个 inode 都有一个号码操作系统 用 inode 号码来识别不同的文件Unix/Linux 系统内部不使用文件名而使用 inode 号码来 识别文件。
3 解决
场景1解决方案有两种 既然是创建父目录已存在我们可以吧positionFile位置重新配置。 修改源代码我们通过源代码找下处理逻辑下载1.9.0版本的flume源代码官网地址https://archive.apache.org/dist/flume/找到TailSource 170行 Overridepublic synchronized void configure(Context context) {String fileGroups context.getString(FILE_GROUPS);Preconditions.checkState(fileGroups ! null, Missing param: FILE_GROUPS);filePaths selectByKeys(context.getSubProperties(FILE_GROUPS_PREFIX),fileGroups.split(\\s));Preconditions.checkState(!filePaths.isEmpty(),Mapping for tailing files is empty or invalid: FILE_GROUPS_PREFIX );String homePath System.getProperty(user.home).replace(\\, /);positionFilePath context.getString(POSITION_FILE, homePath DEFAULT_POSITION_FILE);Path positionFile Paths.get(positionFilePath);try {// 此处创建父目录如果存在报错Files.createDirectories(positionFile.getParent());} catch (IOException e) {throw new FlumeException(Error creating positionFile parent directories, e);}headerTable getTable(context, HEADERS_PREFIX);batchSize context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);skipToEnd context.getBoolean(SKIP_TO_END, DEFAULT_SKIP_TO_END);byteOffsetHeader context.getBoolean(BYTE_OFFSET_HEADER, DEFAULT_BYTE_OFFSET_HEADER);idleTimeout context.getInteger(IDLE_TIMEOUT, DEFAULT_IDLE_TIMEOUT);writePosInterval context.getInteger(WRITE_POS_INTERVAL, DEFAULT_WRITE_POS_INTERVAL);cachePatternMatching context.getBoolean(CACHE_PATTERN_MATCHING,DEFAULT_CACHE_PATTERN_MATCHING);backoffSleepIncrement context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT,PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT);maxBackOffSleepInterval context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP,PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP);fileHeader context.getBoolean(FILENAME_HEADER,DEFAULT_FILE_HEADER);fileHeaderKey context.getString(FILENAME_HEADER_KEY,DEFAULT_FILENAME_HEADER_KEY);maxBatchCount context.getLong(MAX_BATCH_COUNT, DEFAULT_MAX_BATCH_COUNT);if (maxBatchCount 0) {maxBatchCount DEFAULT_MAX_BATCH_COUNT;logger.warn(Invalid maxBatchCount specified, initializing source default maxBatchCount of {}, maxBatchCount);}if (sourceCounter null) {sourceCounter new SourceCounter(getName());}}可以在创建父目录之前检测是否已存在如果已存在直接跳过创建即可修改try代码块中内容如下
boolean exists Files.exists(positionFile.getParent());if (!exists)Files.createDirectories(positionFile.getParent());maven打包替换flume/lib/下 flume-taildir-source-1.9.0.jar 如图所示
重新运行正常启动如下图日志所示
kafka中新接收的数据如下图所示
场景2解决方案 把TailFile如下代码 public boolean updatePos(String path, long inode, long pos) throws IOException {if (this.inode inode this.path.equals(path)) {setPos(pos);updateFilePos(pos);logger.info(Updated position, file: path , inode: inode , pos: pos);return true;}return false;}// 修改为public boolean updatePos(String path, long inode, long pos) throws IOException {if (this.inode inode) {setPos(pos);updateFilePos(pos);logger.info(Updated position, file: path , inode: inode , pos: pos);return true;}return false;}即不校验file只校验inode具体这里不再去验证有兴趣自己验证下
结语
如果小伙伴什么问题或者指教欢迎交流。 ❓QQ:806797785 参考链接:
[1]flume教学视频[CP/OL].2020-04-16.