当前位置: 首页 > news >正文

新闻类网站备案怎么建免费企业官网站

新闻类网站备案,怎么建免费企业官网站,wordpress 最受欢迎主题,网站服务器 内存flume扩展实战#xff1a;自定义拦截器、Source 与 Sink 全指南 Flume 内置的组件虽然能满足大部分场景#xff0c;但在复杂业务需求下#xff08;如特殊格式数据采集、定制化数据清洗#xff09;#xff0c;需要通过自定义组件扩展其功能。本文将详细讲解如何自定义 Flu…flume扩展实战自定义拦截器、Source 与 Sink 全指南 Flume 内置的组件虽然能满足大部分场景但在复杂业务需求下如特殊格式数据采集、定制化数据清洗需要通过自定义组件扩展其功能。本文将详细讲解如何自定义 Flume 拦截器、Source 和 Sink从代码实现到配置部署带你掌握 Flume 扩展的核心技巧。 扩展基础开发环境与依赖 自定义 Flume 组件需基于 Flume 核心 API 开发需提前准备 依赖配置 在 pom.xml 中添加 Flume 核心依赖以 1.9.0 为例 dependency groupIdorg.apache.flume/groupId artifactIdflume-ng-core/artifactId version1.9.0/version scopeprovided/scope !-- 运行时由 Flume 环境提供 -- /dependency 核心接口 Flume 扩展的核心是实现官方定义的接口各组件对应的接口如下 组件类型需实现的接口 / 继承的类核心方法拦截器org.apache.flume.interceptor.Interceptorintercept(Event) 处理单个事件Source继承 AbstractSource实现 PollableSourceprocess() 产生并发送事件Sink继承 AbstractSink实现 Configurableprocess() 从 Channel 消费事件 实战一自定义拦截器Interceptor 拦截器用于在数据从 Source 到 Channel 前对 Event 进行加工如添加元数据、过滤无效数据。以下案例实现一个按内容分类的拦截器为不同类型的 Event 添加 type 头信息。 1.代码实现 通过实现org.apache.flume.interceptor.Interceptor来自定义自己的拦截器 public class MyInterceptor implements Interceptor {Overridepublic void initialize() {}/*** 单个事件拦截* param event* return*/Overridepublic Event intercept(Event event) {// 获取头信息MapString,String headers event.getHeaders();// 获取数据String body new String(event.getBody());// 按 Body 前缀分类 if (body.startsWith(number:)) { headers.put(type, number); // 数字类型 } else if (body.startsWith(log:)) { headers.put(type, log); // 日志类型 } else { headers.put(type, other); // 其他类型 } return event; // 返回处理后的 Event}/*** 批量事件拦截* param list* return*/Overridepublic ListEvent intercept(ListEvent list) {for (Event event : events) { intercept(event); } return events; }Overridepublic void close() {}public static class Builder implements Interceptor.Builder{Overridepublic Interceptor build() {return new MyInterceptor();}Overridepublic void configure(Context context) {// 从配置文件读取参数如无参数可空实现 }}}2. 打包与部署 将代码打包为 JAR如 flume-custom-interceptor.jar将 JAR 复制到 Flume 安装目录的 lib 文件夹下确保 Flume 能加载类。 3. 配置使用拦截器 在 Flume 配置文件中引用自定义拦截器并结合 Multiplexing Channel Selector 实现按类型路由 # 定义组件 agent.sources customSource agent.channels numChannel logChannel otherChannel agent.sinks numSink logSink otherSink # 配置 Source 并启用拦截器 agent.sources.customSource.type seq #拦截器名称 agent.sources.mySource.interceptors myInterceptor # 配置拦截器注意格式包名类名$Builder agent.sources.mySource.interceptors.myInterceptor.type com.zhanghe.study.custom_flume.interceptor.MyInterceptor$Builder # 配置 Channel 选择器按 type 头信息路由 agent.sources.customSource.selector.type multiplexing # 按 Header 中的 type 字段路由 agent.sources.customSource.selector.header type # typenumber → numChannel agent.sources.customSource.selector.mapping.number numChannel # typelog → logChannel agent.sources.customSource.selector.mapping.log logChannel # 默认路由 agent.sources.customSource.selector.default otherChannel # 配置 Channel内存通道 agent.channels.numChannel.type memory agent.channels.logChannel.type memory agent.channels.otherChannel.type memory # 配置 Sink输出到控制台日志 agent.sinks.numSink.type logger agent.sinks.logSink.type logger agent.sinks.otherSink.type logger # 绑定关系 agent.sources.customSource.channels numChannel logChannel otherChannel agent.sinks.numSink.channel numChannel agent.sinks.logSink.channel logChannel agent.sinks.otherSink.channel otherChannel 4. 验证效果 启动 Flume 后序列生成器会产生事件拦截器会按内容添加 type 头信息最终不同类型的事件会路由到对应的 Channel 和 Sink控制台会输出分类后的日志。 实战二自定义Source 自定义 Source 用于从特殊数据源如自研系统、专有协议采集数据。以下案例实现一个周期性生成自定义事件的 Source。 1. 代码实现 自定义的Source需要继承AbstractSource实现Configurable和PollableSource接口 import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.source.AbstractSource; import org.apache.flume.source.PollableSource; import java.util.concurrent.atomic.AtomicInteger; public class MySource extends AbstractSource implements PollableSource, Configurable { private String prefix; // 自定义前缀从配置文件读取 private AtomicInteger counter new AtomicInteger(0); // 计数器 // 从配置文件读取参数 Override public void configure(Context context) { // 读取配置参数默认值为 custom prefix context.getString(prefix, custom); } // 核心方法产生事件并发送到 Channel Override public Status process() throws EventDeliveryException { Status status Status.READY; try { // 生成自定义事件内容 String data prefix : counter.incrementAndGet(); Event event EventBuilder.withBody(data.getBytes()); // 将事件发送到 Channel通过 ChannelProcessor getChannelProcessor().processEvent(event); Thread.sleep(1000); // 每秒生成一个事件 } catch (Exception e) { status Status.BACKOFF; // 失败时返回 BACKOFF if (e instanceof Error) { throw (Error) e; } } return status; } // 失败重试间隔增量默认 0 即可 Override public long getBackOffSleepIncrement() { return 0; } // 最大重试间隔默认 0 即可 Override public long getMaxBackOffSleepInterval() { return 0; } }2. 配置使用自定义 Source # 定义组件 agent.sources customSource agent.channels memoryChannel agent.sinks loggerSink # 配置自定义 Source agent.sources.mySource.type com.zhanghe.study.custom_flume.source.MySource # 自定义参数对应代码中的 prefix agent.sources.customSource.prefix mydata # 配置 Channel 和 Sink复用之前的配置 agent.channels.memoryChannel.type memory agent.sinks.loggerSink.type logger # 绑定关系 agent.sources.customSource.channels memoryChannel agent.sinks.loggerSink.channel memoryChannel 实战三自定义Sink 自定义 Sink 用于将数据发送到特殊目标如专有存储、API 接口。以下案例实现一个将事件内容输出到指定文件的 Sink。 1. 代码实现 自定义的Sink需要继承AbstractSink类,实现Configurable接口 import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; public class MySink extends AbstractSink implements Configurable { private static final Logger logger LoggerFactory.getLogger(FileSink.class); private String filePath; // 输出文件路径 private PrintWriter writer; // 从配置文件读取参数 Override public void configure(Context context) { filePath context.getString(filePath); // 必须配置文件路径 if (filePath null) { throw new IllegalArgumentException(filePath 配置不能为空); } } // 启动 Sink 时初始化文件写入流 Override public void start() { try { writer new PrintWriter(new FileWriter(filePath, true)); // 追加模式 } catch (IOException e) { logger.error(初始化文件写入流失败, e); throw new FlumeException(e); } super.start(); } // 核心方法从 Channel 读取事件并处理 Override public Status process() throws EventDeliveryException { Status status Status.READY; Channel channel getChannel(); Transaction txn channel.getTransaction(); // 开启事务 try { txn.begin(); // 事务开始 Event event channel.take(); // 从 Channel 读取事件 if (event ! null) { // 将事件内容写入文件 String data new String(event.getBody()); writer.println(data); writer.flush(); // 立即刷新 } else { status Status.BACKOFF; // 无事件时返回 BACKOFF } txn.commit(); // 事务提交 } catch (Exception e) { txn.rollback(); // 失败时回滚事务 status Status.BACKOFF; if (e instanceof Error) { throw (Error) e; } } finally { txn.close(); // 关闭事务 } return status; } // 停止时关闭资源 Override public void stop() { if (writer ! null) { writer.close(); } super.stop(); } } 2. 配置使用自定义 Sink # 定义组件 agent.sources seqSource agent.channels memoryChannel agent.sinks fileSink # 配置 Source使用序列生成器 agent.sources.seqSource.type seq # 配置自定义 Sink agent.sinks.fileSink.type com.zhanghe.study.custom_flume.sink.MySink # 输出文件路径 agent.sinks.fileSink.filePath /tmp/flume-custom-sink.log # 配置 Channel agent.channels.memoryChannel.type memory # 绑定关系 agent.sources.seqSource.channels memoryChannel agent.sinks.fileSink.channel memoryChannel 扩展注意事项与最佳实践 1. 可靠性保障 事务支持自定义 Source/Sink 必须严格遵循 Flume 事务机制如 Sink 需通过 Transaction 操作 Channel避免数据丢失异常处理对可能的异常如 IO 错误、网络超时进行捕获并返回 Status.BACKOFF 触发重试。 2. 性能优化 批量处理在 intercept(ListEvent) 和 process() 中支持批量处理减少函数调用开销参数可配置通过 Context 读取配置参数如批量大小、重试次数避免硬编码。 3. 调试与监控 日志输出使用 SLF4J 日志框架输出关键步骤如事件处理结果、异常信息指标暴露通过 Flume 的 MetricSupport 接口暴露自定义指标如处理事件数、失败数便于监控。 4. 版本兼容性 确保自定义组件依赖的 Flume 版本与部署环境一致避免因 API 变更导致兼容性问题。 参考文献 flume扩展
http://www.zqtcl.cn/news/645183/

相关文章:

  • 一键网站制作机关网站建设建议
  • 快站公众号工具台州网站制作系统分析怎么写
  • 品牌网站制作方案如何写推广软文
  • o2o营销seo薪酬如何
  • 网站开发公司 网站空间推广网站制作
  • 鞍山网站制作小程序WordPress网盘下载插件
  • 保山市建设厅官方网站郑州建设信息网站
  • clh网站建设公司h5网站源代码
  • 做装修的网站怎么做好服装市场调网站建设的目的
  • 佛山网站建站电子工程网名又知道你是做工程
  • 桐乡网站二次开发商城购物网站建设
  • 大连微网站制作公司网页多钱
  • 郑州网站托管助企网络营销推广合作
  • 做电商网站用什么软件企业网站建设方案范本
  • o2o商城网站搭建潍坊定制网站搭建
  • 网站建设费用说明青岛网站建设方案公司
  • 佛山市建设企业网站服务机构优化seo是什么
  • 仿70网站分类目录源码市场营销策划ppt免费模板
  • 广东圆心科技网站开发网站模板设计网页程序代码
  • 网站平台定制开发一级a做爰网站下载
  • 网站如何做流媒体wordpress导出软件
  • 电商网站流程图esp8266做网站
  • 细胞医疗 网站模版免费网址软件
  • app地推网企业seo解决方案
  • php网站转移网吧手机网站模版
  • 北京建设教育网站今天的国内新闻
  • 江苏省建设银行网站天心区网站建设公司
  • 网站分享设计网站备案收费么
  • 手机网站专题关于asp sql网站开发的书籍
  • 网站建设属于什么领域小米发布会在哪里看