当前位置: 首页 > news >正文

麻章网站开发公司wordpress发文章摘要

麻章网站开发公司,wordpress发文章摘要,平面设计培训哪个好,中国沈阳app在哪里下载前置知识#xff1a; Flume学习笔记#xff08;1#xff09;—— Flume入门-CSDN博客 Flume学习笔记#xff08;2#xff09;—— Flume进阶-CSDN博客 Flume 自定义组件 自定义 Interceptor 需求分析#xff1a;使用 Flume 采集服务器本地日志#xff0c;需要按照日志… 前置知识 Flume学习笔记1—— Flume入门-CSDN博客 Flume学习笔记2—— Flume进阶-CSDN博客 Flume 自定义组件 自定义 Interceptor 需求分析使用 Flume 采集服务器本地日志需要按照日志类型的不同将不同种类的日志发往不同的分析系统 需要使用Flume 拓扑结构中的 Multiplexing 结构Multiplexing的原理是根据 event 中 Header 的某个 key 的值将不同的 event 发送到不同的 Channel中所以我们需要自定义一个 Interceptor为不同类型的 event 的 Header 中的 key 赋予不同的值 实现流程 代码 导入依赖 dependenciesdependencygroupIdorg.apache.flume/groupIdartifactIdflume-ng-core/artifactIdversion1.9.0/version/dependency /dependencies 自定义拦截器的代码 package com.why.interceptor;import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;import java.util.ArrayList; import java.util.List; import java.util.Map;public class TypeInterceptor implements Interceptor {//存放事件的集合private ListEvent addHeaderEvents;Overridepublic void initialize() {//初始化集合addHeaderEvents new ArrayList();}//单个事件拦截Overridepublic Event intercept(Event event) {//获取头信息MapString, String headers event.getHeaders();//获取body信息String body new String(event.getBody());//根据数据中是否包含”why“来分组if(body.contains(why)){headers.put(type,first);}else {headers.put(type,second);}return event;}//批量事件拦截Overridepublic ListEvent intercept(ListEvent events) {//清空集合addHeaderEvents.clear();//遍历eventsfor(Event event : events){//给每一个事件添加头信息addHeaderEvents.add(intercept(event));}return addHeaderEvents;}Overridepublic void close() {}//构建生成器public static class TypeBuilder implements Interceptor.Builder{Overridepublic Interceptor build() {return new TypeInterceptor();}Overridepublic void configure(Context context) {}}} 将代码打包放入flume安装路径下的lib文件夹中 配置文件 在job文件夹下创建group4目录添加配置文件 为 hadoop102 上的 Flume1 配置 1 个 netcat source1 个 sink group2 个 avro sink并配置相应的 ChannelSelector 和 interceptor # Name the components on this agent a1.sources r1 a1.sinks k1 k2 a1.channels c1 c2# Describe/configure the source a1.sources.r1.type netcat a1.sources.r1.bind localhost a1.sources.r1.port 44444 a1.sources.r1.interceptors i1 a1.sources.r1.interceptors.i1.type com.why.interceptor.TypeInterceptor$TypeBuilder a1.sources.r1.selector.type multiplexing a1.sources.r1.selector.header type a1.sources.r1.selector.mapping.first c1 a1.sources.r1.selector.mapping.second c2# Describe the sink a1.sinks.k1.type avro a1.sinks.k1.hostname hadoop103 a1.sinks.k1.port 4141 a1.sinks.k2.typeavro a1.sinks.k2.hostname hadoop104 a1.sinks.k2.port 4242# Use a channel which buffers events in memory a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 # Use a channel which buffers events in memory a1.channels.c2.type memory a1.channels.c2.capacity 1000 a1.channels.c2.transactionCapacity 100# Bind the source and sink to the channel a1.sources.r1.channels c1 c2 a1.sinks.k1.channel c1 a1.sinks.k2.channel c2 hadoop103配置一个 avro source 和一个 logger sink a1.sources r1 a1.sinks k1 a1.channels c1 a1.sources.r1.type avro a1.sources.r1.bind hadoop103 a1.sources.r1.port 4141 a1.sinks.k1.type logger a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 a1.sinks.k1.channel c1 a1.sources.r1.channels c1 hadoop104配置一个 avro source 和一个 logger sink a1.sources r1 a1.sinks k1 a1.channels c1 a1.sources.r1.type avro a1.sources.r1.bind hadoop104 a1.sources.r1.port 4242 a1.sinks.k1.type logger a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 a1.sinks.k1.channel c1 a1.sources.r1.channels c1 执行指令 hadoop102bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume-interceptor-flume.conf hadoop103bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume1-flume-logger.conf -Dflume.root.loggerINFO,console hadoop104bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume2-flume-logger.conf -Dflume.root.loggerINFO,console 然后hadoop102通过nc连接44444端口发送数据 在hadoop103和104上分别接收到 自定义 Source 官方提供的文档Flume 1.11.0 Developer Guide — Apache Flume 给出的示例代码如下 public class MySource extends AbstractSource implements Configurable, PollableSource {private String myProp;Overridepublic void configure(Context context) {String myProp context.getString(myProp, defaultValue);// Process the myProp value (e.g. validation, convert to another type, ...)// Store myProp for later retrieval by process() methodthis.myProp myProp;}Overridepublic void start() {// Initialize the connection to the external client}Overridepublic void stop () {// Disconnect from external client and do any additional cleanup// (e.g. releasing resources or nulling-out field values) ..}Overridepublic Status process() throws EventDeliveryException {Status status null;try {// This try clause includes whatever Channel/Event operations you want to do// Receive new dataEvent e getSomeData();// Store the Event into this Sources associated Channel(s)getChannelProcessor().processEvent(e);status Status.READY;} catch (Throwable t) {// Log exception, handle individual exceptions as neededstatus Status.BACKOFF;// re-throw all Errorsif (t instanceof Error) {throw (Error)t;}} finally {txn.close();}return status;} } 需要继承AbstractSource实现Configurable, PollableSource 实战需求分析 使用 flume 接收数据并给每条数据添加前缀输出到控制台。前缀可从 flume 配置文件中配置 代码 package com.why.source;import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource;import java.util.HashMap; import java.util.concurrent.ConcurrentMap;public class MySource extends AbstractSource implements PollableSource, Configurable {//定义配置文件将来要读取的字段private Long delay;private String field;//获取数据封装成 event 并写入 channel这个方法将被循环调用Overridepublic Status process() throws EventDeliveryException {try {//事件头信息HashMapString,String headerMap new HashMap();//创建事件SimpleEvent event new SimpleEvent();//循环封装事件for (int i 0; i 5; i) {//设置头信息event.setHeaders(headerMap);//设置事件内容event.setBody((field i).getBytes());//将事件写入ChannelgetChannelProcessor().processEvent(event);Thread.sleep(delay);}}catch (InterruptedException e) {throw new RuntimeException(e);}return Status.READY;}//backoff 步长Overridepublic long getBackOffSleepIncrement() {return 0;}//backoff 最长时间Overridepublic long getMaxBackOffSleepInterval() {return 0;}//初始化 context读取配置文件内容Overridepublic void configure(Context context) {delay context.getLong(delay);field context.getString(field,Hello);}}打包放到flume安装路径下的lib文件夹中 配置文件 # Name the components on this agent a1.sources r1 a1.sinks k1 a1.channels c1# Describe/configure the source a1.sources.r1.type com.why.source.MySource a1.sources.r1.delay 1000 a1.sources.r1.field why# Describe the sink a1.sinks.k1.type logger# Use a channel which buffers events in memory a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100# Bind the source and sink to the channel a1.sources.r1.channels c1 a1.sinks.k1.channel c1 执行指令 hadoop102上bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group5/mysource.conf -Dflume.root.loggerINFO,console 结果如下 自定义 Sink Sink 不断地轮询 Channel 中的事件且批量地移除它们并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。 Sink 是完全事务性的。在从 Channel 批量删除数据之前每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume AgentSink 就利用 Channel 提交事务。事务一旦被提交该 Channel 从自己的内部缓冲区删除事件 官方文档Flume 1.11.0 Developer Guide — Apache Flume 接口实例 public class MySink extends AbstractSink implements Configurable {private String myProp;Overridepublic void configure(Context context) {String myProp context.getString(myProp, defaultValue);// Process the myProp value (e.g. validation)// Store myProp for later retrieval by process() methodthis.myProp myProp;}Overridepublic void start() {// Initialize the connection to the external repository (e.g. HDFS) that// this Sink will forward Events to ..}Overridepublic void stop () {// Disconnect from the external respository and do any// additional cleanup (e.g. releasing resources or nulling-out// field values) ..}Overridepublic Status process() throws EventDeliveryException {Status status null;// Start transactionChannel ch getChannel();Transaction txn ch.getTransaction();txn.begin();try {// This try clause includes whatever Channel operations you want to doEvent event ch.take();// Send the Event to the external repository.// storeSomeData(e);txn.commit();status Status.READY;} catch (Throwable t) {txn.rollback();// Log exception, handle individual exceptions as neededstatus Status.BACKOFF;// re-throw all Errorsif (t instanceof Error) {throw (Error)t;}}return status;} } 自定义MySink 需要继承 AbstractSink 类并实现 Configurable 接口 实战需求分析 使用 flume 接收数据并在 Sink 端给每条数据添加前缀和后缀输出到控制台。前后缀可在 flume 任务配置文件中配置 代码 package com.why.sink;import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink;import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable {//创建 Logger 对象private static final Logger LOG LoggerFactory.getLogger(AbstractSink.class);//前后缀private String prefix;private String suffix;Overridepublic Status process() throws EventDeliveryException {//声明返回值状态信息Status status;//获取当前 Sink 绑定的 ChannelChannel ch getChannel();//获取事务Transaction txn ch.getTransaction();//声明事件Event event;//开启事务txn.begin();//读取 Channel 中的事件直到读取到事件结束循环while (true) {event ch.take();if (event ! null) {break;}}try {//处理事件打印LOG.info(prefix new String(event.getBody()) suffix);//事务提交txn.commit();status Status.READY;} catch (Exception e) {//遇到异常事务回滚txn.rollback();status Status.BACKOFF;} finally {//关闭事务txn.close();}return status;}Overridepublic void configure(Context context) {prefix context.getString(prefix, hello);suffix context.getString(suffix);} }打包放到flume安装路径下的lib文件夹中 配置文件 # Name the components on this agent a1.sources r1 a1.sinks k1 a1.channels c1# Describe/configure the source a1.sources.r1.type netcat a1.sources.r1.bind localhost a1.sources.r1.port 44444# Describe the sink a1.sinks.k1.type com.why.sink.MySink a1.sinks.k1.prefix why: a1.sinks.k1.suffix :why# Use a channel which buffers events in memory a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100# Bind the source and sink to the channel a1.sources.r1.channels c1 a1.sinks.k1.channel c1 执行指令 hadoop102上bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group6/mysink.conf -Dflume.root.loggerINFO,console 结果如下
http://www.zqtcl.cn/news/956613/

相关文章:

  • 中企动力建站怎么样网站建设与设计的心得体会
  • 打开网站出现directoryj2ee做网站
  • 如何建设一个视频网站西安个人做网站
  • wordpress站群教程市场营销培训课程
  • 17网站一起做网店白沟简单网页制作图片
  • 网站建设项目需求分析流程做商业地产的网站
  • 百度建站商业网点的定义
  • 古镇建设网站经济研究院网站建设方案
  • 会员网站开发百度自己的宣传广告
  • 重庆网络推广网站推广自己设计图纸的软件
  • 国内免费的短视频素材网站什么网站做博客好
  • 个体户网站建设wordpress修改作者链接
  • 做企业网站怎么样如何做网站的登录注册
  • 网站建设中标怎么做网站文字图片
  • 济南网站推广徽hyhyk1公司展示网站模板
  • ae免费模板下载网站视频网站数据库设计
  • 找做金融的网站网站建设方面存在的问题
  • 门户网站建设与开发wordpress添加文章总数标签总数
  • 想创办一个本地的人才招聘网站_如何做市场调查问卷windows7优化大师下载
  • 做网站建设要什么证视频付费网站建设
  • html网站建设实例代码软件下载app排行榜
  • 高端个人网站网站建设密码
  • 全网seo秦皇岛市做网站优化
  • 简述站点推广有哪些方式大兴做网站公司
  • 网站关键词密度查询太仓网站设计早晨设计
  • 厦门市同安区建设局官方网站永嘉网站建设
  • 工程师网站建设网页设计与制作基础教程答案
  • php 开发手机网站建设互动平台抽手机
  • 网站 被降权网页平面设计要学什么
  • 团购网站短信平台中国建设银行网站客户注册码