Java 实现日志文件监听并读取相关数据
项目需求
由于所在数据中台项目组需要实现监听文件夹或者日志文件并读取对应格式的脏数据的需求,以便在文件、文件夹发生变化时进行相应的业务流程;所以在这里记录下相关业务的实现及技术选型。
Apache Commons-IO
首先需要添加对应依赖:
<dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.7</version></dependency>
版本可自行选择,这里需要注意的是 2.7 版本及以上需要 Java 8
官网截图:
Apache Commos IO中的 org.apache.commons.io.monitor
包提供了文件系统监听的功能。
核心知识
一句话总结:通过观察者模式以及事件监听机制,以 FileAlterationObserver
为核心,再通过 FileAlterationListener
,FileAlterationMonitor
,就可以实现对文件系统的事件监听。
整体流程:
自定义文件监听类并继承 FileAlterationListenerAdaptor
实现对文件与目录的创建,修改,删除事件的处理
自定义文件监控类,通过指定目录创建一个观察者 FileAlterationObserver
向此监视器添加文件系统观察器,并添加文件监听器
通过 ApplicationRunner
或者 CommandLineRunner
调用并执行
代码实现
文件监听类:
/** * 文件监听类 * @author Greenarrow * @date 2022-05-12 14:43 **/public class FileListener extends FileAlterationListenerAdaptor { private static Logger logger = LoggerFactory.getLogger(FileListener.class); @Override public void onStart(FileAlterationObserver observer) { super.onStart(observer); logger.info("onStart"); } @Override public void onDirectoryCreate(File directory) { logger.info("[新建]:" + directory.getAbsolutePath()); } @Override public void onDirectoryChange(File directory) { logger.info("[修改]:" + directory.getAbsolutePath()); } @Override public void onDirectoryDelete(File directory) { logger.info("[删除]:" + directory.getAbsolutePath()); } @Override public void onFileCreate(File file) { String compressedPath = file.getAbsolutePath(); logger.info("[新建]:" + compressedPath); List<String> contentList = null; try { if (file.canRead()){ // 将文件按行读取为字符串集合 contentList = FileUtils.readLines(new File(compressedPath), StandardCharsets.UTF_8); if (CollectionUtil.isNotEmpty(contentList)){ // 获取对应格式的数据并输出,这里可自行添加业务处理 List<String> dirtyRecord = contentList.stream().filter(s -> s.startsWith("{") && s.endsWith("}")).collect(Collectors.toList()); dirtyRecord.forEach(System.out::println); } } } catch (IOException e) { e.printStackTrace(); logger.error("读取文件内容失败",e); } } @Override public void onFileChange(File file) { String compressedPath = file.getAbsolutePath(); logger.info("[修改]:" + compressedPath); } @Override public void onFileDelete(File file) { logger.info("[删除]:" + file.getAbsolutePath()); } @Override public void onStop(FileAlterationObserver observer) { super.onStop(observer); logger.info("onStop"); }}
文件监控类:
/** * 文件监听测试 demo * @author Greenarrow * @date 2022-05-12 14:45 **/public class FileMonitor { FileAlterationMonitor monitor = null; public FileMonitor(long interval) throws Exception { monitor = new FileAlterationMonitor(interval); } /** * 给文件添加监听 * @param path * @param listener */ public void monitor(String path, FileAlterationListener listener) { FileAlterationObserver observer = new FileAlterationObserver(new File(path)); monitor.addObserver(observer); observer.addListener(listener); } public void stop() throws Exception { monitor.stop(); } public void start() throws Exception { monitor.start(); }}
自定义 Runner 并实现 CommandLineRunner
:
/** * 项目启动之后开启文件监听功能 * @author Greenarrow * @date 2022-05-12 10:02 **/@Component// @Order(Integer.MIN_VALUE)public class DirtyRecordRunner implements CommandLineRunner { private static Logger logger = LoggerFactory.getLogger(DirtyRecordRunner.class); @Value("${test}") private String path; @Override public void run(String... args) throws Exception { logger.info(this.getClass().getName()+"[开启文件夹监听功能]"); FileMonitor fileMonitor = new FileMonitor(1000); fileMonitor.monitor(path,new FileListener()); fileMonitor.start(); }}
总结
文件、文件夹监听功能较简单,实现方式可自行选择
需要注意的是,通过 Runner 的方式在项目启动之后开启相关监听功能,此方式存在缺陷,只能调用一次,出现异常就会停止,除非项目重启,这里需要通过相关补偿机制来实现
END -
原文:https://juejin.cn/post/7100926619958657037