域名访问网站是什么意思,域名备案需要多少时间,wordpress 改成动态,营销推广网背景描述#xff1a;
为了满足linux服务器上特定目录的非结构化文件的实时监控#xff0c;并上传HDFS 使用的方法
Apache的Commons-IO#xff0c;来实现文件的监控功能
所需要的pom
dependenciesdependencygroupIdorg.apache.hadoop/groupI…背景描述
为了满足linux服务器上特定目录的非结构化文件的实时监控并上传HDFS 使用的方法
Apache的Commons-IO来实现文件的监控功能
所需要的pom
dependenciesdependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion3.0.0/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-common/artifactIdversion3.0.0/version/dependencydependencygroupIdcommons-io/groupIdartifactIdcommons-io/artifactIdversion2.6/version/dependencydependencygroupIdorg.apache.commons/groupIdartifactIdcommons-lang3/artifactIdversion3.9/version/dependencydependencygroupIdcom.google.code.findbugs/groupIdartifactIdjsr305/artifactIdversion1.3.9/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.4/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.28/versionscopecompile/scope/dependencydependencygroupIdcom.alibaba.fastjson2/groupIdartifactIdfastjson2/artifactIdversion2.0.26/version/dependency!-- https://mvnrepository.com/artifact/cn.hutool/hutool-all --dependencygroupIdcn.hutool/groupIdartifactIdhutool-all/artifactIdversion5.8.22/version/dependency/dependencies
public static void copyFile2HDFS(URI hdfsURI, String username, String srcPath, String newPath) {try {Configuration conf new Configuration();FileSystem fs FileSystem.get(hdfsURI, conf, username);Path src new Path(srcPath);Path dst new Path(newPath);if (fs.exists(dst)) {fs.delete(dst, true);}fs.copyFromLocalFile(src, dst);fs.close();System.out.println(Upload Successfully!);} catch (Exception e) {e.printStackTrace();StaticLog.info(复制文件失败{}, e.getMessage());}}
public static String getHDFSPath(File file) {// 判断文件格式包括视频、图片、文本和音频等你可以根据实际需求进行修改String fileName file.getName();String extension fileName.substring(fileName.lastIndexOf(.) 1).toLowerCase();if (extension.equals(mp4) || extension.equals(avi) || extension.equals(mov)) {return /data/shipin/ file.getName();} else if (extension.equals(jpg) || extension.equals(png)) {return /data/txt/ file.getName();} else if (extension.equals(m4a) || extension.equals(wav)) {return /data/yuyin/ file.getName();} else if (extension.equals(txt)) {return /data/wenjian/ file.getName();} else {return /data/ file.getName();}}
FileMonitorTest.java//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//package com.xxx.fileSync;import java.util.concurrent.TimeUnit;
import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.commons.io.monitor.FileAlterationMonitor;
import org.apache.commons.io.monitor.FileAlterationObserver;public class FileMonitorTest {public FileMonitorTest() {}public static void main(String[] arugs) throws Exception {String absolateDir /opt/xxxx;long intervalTime TimeUnit.SECONDS.toMillis(5L);new FileAlterationObserver(absolateDir, FileFilterUtils.and(new IOFileFilter[]{FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter(.success)}));FileAlterationObserver observer new FileAlterationObserver(absolateDir);observer.addListener(new FileListener());FileAlterationMonitor monitor new FileAlterationMonitor(intervalTime, new FileAlterationObserver[]{observer});monitor.start();}
}FileListener.java重写方法
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//package com.xxx.fileSync;import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
import org.apache.commons.io.monitor.FileAlterationObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class FileListener extends FileAlterationListenerAdaptor {private static final Logger log LoggerFactory.getLogger(FileListener.class);URI uri new URI(hdfs://xxxxx:802xx0);String newPath ;String newHDFSPath ;String userName root;public FileListener() throws URISyntaxException {}public void onStart(FileAlterationObserver observer) {super.onStart(observer);}public void onDirectoryCreate(File directory) {this.newPath /data directory.getName();System.out.println(文件路径 directory.getAbsolutePath() 文件夹创建 directory.getName());FileUtil.newDir2HDFS(this.uri, this.userName, this.newPath);log.info([Deleted Directory] : {}, directory.getAbsolutePath());}public void onDirectoryChange(File directory) {log.info([Changed Directory] : {}, directory.getAbsolutePath());}public void onDirectoryDelete(File directory) {log.info([Created Directory] : {}, directory.getAbsolutePath());}public void onFileCreate(File file) {try {log.info([Created File] : {}, file.getAbsolutePath());this.newHDFSPath FileUtil.getHDFSPath(file);this.newPath FileUtil.getDestPath(file);System.out.println(监控源文件路径 file.toPath());System.out.println(监控源文件路径 file.getAbsolutePath() 目标HDFS文件创建 this.newHDFSPath);System.out.println(监控源文件路径 file.getAbsolutePath() 目标Linux文件创建 this.newPath);FileUtil.copyFile2HDFS(this.uri, this.userName, file.getAbsolutePath(), this.newHDFSPath);Files.copy(file.toPath(), (new File(this.newPath)).toPath(), StandardCopyOption.REPLACE_EXISTING);} catch (Throwable var3) {throw var3;}}public void onFileChange(File file) {try {log.info([Amended File] : {}, file.getAbsolutePath());this.newPath FileUtil.getDestPath(file);FileUtil.copyFile2HDFS(this.uri, this.userName, file.getAbsolutePath(), this.newPath);Files.copy(file.toPath(), (new File(this.newPath)).toPath(), StandardCopyOption.REPLACE_EXISTING);} catch (Throwable var3) {throw var3;}}public void onFileDelete(File file) {try {log.info([Deleted File] : {}, file.getAbsolutePath());this.newHDFSPath FileUtil.getHDFSPath(file);this.newPath FileUtil.getDestPath(file);FileUtil.delFile2HDFS(this.uri, this.userName, this.newHDFSPath);Files.delete((new File(this.newPath)).toPath());} catch (Throwable var3) {throw var3;}}public void onStop(FileAlterationObserver observer) {super.onStop(observer);}
}