做外贸无法登录国外网站怎么办,北京网页设计制作,动态的网站怎么做,ui设计师需要会什么Flume
Interceptor
概述
Interceptor(拦截器)本身是Source的子组件之一#xff0c;可以对数据进行拦截、过滤、替换等操作不同于Selector#xff0c;一个Source上可以配置多个Interceptor#xff0c;构成拦截器链。需要注意的是#xff0c;后一个拦截器不能和前一个拦截…Flume
Interceptor
概述
Interceptor(拦截器)本身是Source的子组件之一可以对数据进行拦截、过滤、替换等操作不同于Selector一个Source上可以配置多个Interceptor构成拦截器链。需要注意的是后一个拦截器不能和前一个拦截器的规则相反
Timestamp Interceptor 在Event的headers中添加一个timestamp字段来表示数据被收集的时间戳(单位是毫秒) 案例Event的header中自动添加上时间戳 a1.sources s1
a1.channels c1
a1.sinks k1a1.sources.s1.type netcat
a1.sources.s1.bind 0.0.0.0
a1.sources.s1.port 8080
# 给拦截器起名
a1.sources.s1.interceptors i1
# 配置Timestamp Interceptor
# 类型必须是timestamp
a1.sources.s1.interceptors.i1.type timestampa1.channels.c1.type memory
a1.channels.c1.capacity 10000
a1.channels.c1.transactionCapacity 1000a1.sinks.k1.type loggera1.sources.s1.channels c1
a1.sinks.k1.channel c1Timestamp Interceptor结合HDFS Sink可以实现数据的按时间段存放。文件名后会添加上年月日并且每日的Event输出到当天对应的文件中。 a1.sources s1
a1.channels c1
a1.sinks k1a1.sources.s1.type netcat
a1.sources.s1.bind 0.0.0.0
a1.sources.s1.port 8080
a1.sources.s1.interceptors i1
a1.sources.s1.interceptors.i1.type timestampa1.channels.c1.type memory
a1.channels.c1.capacity 10000
a1.channels.c1.transactionCapacity 1000a1.sinks.k1.type hdfs
a1.sinks.k1.hdfs.path hdfs://hadoop01:9000/flume_data/logdate%Y-%m-%d
a1.sinks.k1.hdfs.rollInterval 3600
a1.sinks.k1.hdfs.rollSize 134217728
a1.sinks.k1.hdfs.rollCount 1000000000
a1.sinks.k1.hdfs.fileType DataStreama1.sources.s1.channels c1
a1.sinks.k1.channel c1Host Interceptor 在Event的headers中添加一个host字段用于标记这个数据的来源主机 案例 a1.sources s1
a1.channels c1
a1.sinks k1a1.sources.s1.type netcat
a1.sources.s1.bind 0.0.0.0
a1.sources.s1.port 8080
# 给拦截器起名。多个拦截器的命名必须在一行。不能写在两行
a1.sources.s1.interceptors i1 i2
# 配置Timestamp Interceptor
# 类型必须是timestamp
a1.sources.s1.interceptors.i1.type timestamp
# 配置Host Interceptor
# 类型必须是host
a1.sources.s1.interceptors.i2.type host
# 使用IP还是主机名
a1.sources.s1.interceptors.i2.useIP falsea1.channels.c1.type memory
a1.channels.c1.capacity 10000
a1.channels.c1.transactionCapacity 1000a1.sinks.k1.type loggera1.sources.s1.channels c1
a1.sinks.k1.channel c1Static Interceptor 在Event的headers中添加指定的字段以及指定的值实际过程中用于做标记 案例 Event的header中自动添加 classflume这一key value a1.sources s1
a1.channels c1
a1.sinks k1a1.sources.s1.type netcat
a1.sources.s1.bind 0.0.0.0
a1.sources.s1.port 8080
# 给拦截器起名
a1.sources.s1.interceptors i1 i2 i3
# 配置Timestamp Interceptor
# 类型必须是timestamp
a1.sources.s1.interceptors.i1.type timestamp
# 配置Host Interceptor
# 类型必须是host
a1.sources.s1.interceptors.i2.type host
# 使用IP还是主机名
a1.sources.s1.interceptors.i2.useIP true
# 配置Static Interceptor
# 类型必须是static
a1.sources.s1.interceptors.i3.type static
# 指定键
a1.sources.s1.interceptors.i3.key class
# 指定值
a1.sources.s1.interceptors.i3.value flumea1.channels.c1.type memory
a1.channels.c1.capacity 10000
a1.channels.c1.transactionCapacity 1000a1.sinks.k1.type loggera1.sources.s1.channels c1
a1.sinks.k1.channel c1UUID Interceptor UUID计算产生一串编号由于编号位数比较多因此几乎不太可能产生重复的编号因此实际过程中经常使用UUID作为唯一标记 UUID Interceptor是在Event的headers中添加一个id字段来标记这个数据的唯一性 案例 a1.sources s1
a1.channels c1
a1.sinks k1a1.sources.s1.type netcat
a1.sources.s1.bind 0.0.0.0
a1.sources.s1.port 8080
# 给拦截器起名
a1.sources.s1.interceptors i1 i2 i3 i4
# 配置Timestamp Interceptor
# 类型必须是timestamp
a1.sources.s1.interceptors.i1.type timestamp
# 配置Host Interceptor
# 类型必须是host
a1.sources.s1.interceptors.i2.type host
# 使用IP还是主机名
a1.sources.s1.interceptors.i2.useIP true
# 配置Static Interceptor
# 类型必须是static
a1.sources.s1.interceptors.i3.type static
# 指定键
a1.sources.s1.interceptors.i3.key class
# 指定值
a1.sources.s1.interceptors.i3.value flume
# 配置UUID Interceptor
# 类型是UUIDInterceptor$Builder
a1.sources.s1.interceptors.i4.type org.apache.flume.sink.solr.morphline.UUIDInterceptor$Buildera1.channels.c1.type memory
a1.channels.c1.capacity 10000
a1.channels.c1.transactionCapacity 1000a1.sinks.k1.type loggera1.sources.s1.channels c1
a1.sinks.k1.channel c1Search And Replace Interceptor 在使用的时候需要指定正则表达式会将满足正则表达式的数据替换为指定形式的数据。在替换的时候只替换body中的数据不替换headers中的数据 案例 a1.sources s1
a1.channels c1
a1.sinks k1a1.sources.s1.type http
a1.sources.s1.port 8888
a1.sources.s1.interceptors i1
# 类型必须是search_replace
a1.sources.s1.interceptors.i1.type search_replace
# 指定正则表达式
a1.sources.s1.interceptors.i1.searchPattern [0-9]
# 指定替换形式
a1.sources.s1.interceptors.i1.replaceString *a1.channels.c1.type memorya1.sinks.k1.type loggera1.sources.s1.channels c1
a1.sinks.k1.channel c1Regex Filtering Interceptor 在使用的时候需要指定正则表达式。通过属性excludeEvents来决定过滤方式。如果excludeEvents的值为true表示符合正则表达式形式的数据会被过滤掉如果excludeEvents的值为false那么表示不符合正则表达式形式的数据会被过滤掉 案例 a1.sources s1
a1.channels c1
a1.sinks k1a1.sources.s1.type netcat
a1.sources.s1.bind 0.0.0.0
a1.sources.s1.port 8888
a1.sources.s1.interceptors i1
# 类型必须是regex_filter
a1.sources.s1.interceptors.i1.type regex_filter
# 指定正则表达式
a1.sources.s1.interceptors.i1.regex .*[0-9].*
# 指定过滤方式
a1.sources.s1.interceptors.i1.excludeEvents truea1.channels.c1.type memorya1.sinks.k1.type loggera1.sources.s1.channels c1
a1.sinks.k1.channel c1Custom Interceptor
定义一个类实现Interceptor接口同时还需要提供内部类Builder
//模拟TimeStamp
public class AuthInterceptor implements Interceptor {Overridepublic void initialize() {}//单条处理Overridepublic Event intercept(Event event){MapString, String headers event.getHeaders();if(headers.containsKey(time)||headers.containsKey(timestamp)) return event;//时间格式可自定义headers.put(time, String.valueOf(System.currentTimeMillis()));return event;}//按批处理Overridepublic ListEvent intercept(ListEvent list) {// 定义集合存储处理之后的数据ListEvent es new ArrayList();for (Event event : es) {Event e intercept(event);es.add(e);}return es;}Overridepublic void close() {}//这个权限修饰符手动改为public 。 默认为default外部程序访问不到
public static class AuthBuilder implements Builder {// 通过这个函数来获取当前拦截器对象Overridepublic Interceptor build() {return new AuthInterceptor();}// 获取配置Overridepublic void configure(Context context) {}
}
}打成jar包然后放到Flume的lib目录下 cd /opt/software/flume-1.11.0/lib/
rz格式文件 cd ../data在文件中添加 a1.sources s1
a1.channels c1
a1.sinks k1a1.sources.s1.type netcat
a1.sources.s1.bind 0.0.0.0
a1.sources.s1.port 8080
a1.sources.s1.interceptors i1
a1.sources.s1.interceptors.i1.type com.fesco.in.AuthInterceptor$AuthBuildera1.channels.c1.type memory
a1.channels.c1.capacity 10000
a1.channels.c1.transactionCapacity 1000a1.sinks.k1.type loggera1.sources.s1.channels c1
a1.sinks.k1.channel c1**拦截器的功能**自动给headers加时间戳。将events按时间在channel中存储在不同文件。给Headers中自动添加该条events的来源主机号自定义一对kv添加到headers中。将body中的内容按正则匹配并调换。 将body中的内容过滤。 生成UUID唯一标识。
其他
执行流程 Source采集数据数据被采集之后会交给ChannelProcessor来处理ChannelProcessor收到数据之后会将数据交给Interceptor来进行过滤、拦截、替换等操作。需要注意的是可以存在多个Interceptor构成拦截器链Interceptor处理完数据之后会将数据交给Selector来进行分发。Selector有3种模式replicating、multipexing、load balancing。根据指定的模式将数据分发给对应的ChannelChannel收到数据之后会将数据推送给SinkProcessor。SinkProcessor本质上是一个SinkGroup需要将一个或者多个Sink绑定到一个组中支持三种模式default、failover和load balancingSinkProcessor收到数据之后将数据按照指定模式推送给SinkSink将数据写到目的地
扩展Flume监控 - Ganglia
概述
实际过程中可以使用Ganglia监控Flume的数据流。Ganglia是Berkeley发起的一个开源的集群监控项目可以检测数以千计的节点的性能Ganglia包含三个模块 gmond(Ganglia Monitoring Daemon)轻量级的监控服务需要监控哪一个节点的性能就在这个节点上安装gmond服务可以监控当前节点(系统)的各种指标数据CPU、内存、磁盘、网络等信息gmetad(Ganglia Meta Daemon)轻量级的汇合服务可以将监控信息以RRD格式来存储到磁盘上gweb(Ganglia Web)Ganglia提供的轻量级的可视化服务本身是使用PHP来开发的提供了WEB页面能够使得用户较为直观和简便的查看到节点的性能
安装 三个节点上都需要安装httpd和php服务 yum -y install httpd php三个节点上都需要安装rrd服务 yum -y install rrdtool perl-rrdtool rrdtool-devel apr-devel三个节点依赖Epel yum -y install epel-release第一个节点上安装Ganglia yum -y install ganglia-gmetad ganglia-gmond ganglia-web其他两个节点上安装gmond服务 yum -y install ganglia-gmond第一个节点上修改ganglia.conf vim /etc/httpd/conf.d/ganglia.conf文件修改如下 Location /ganglia# Require local# Require ip 10.1.2.3# Require host example.orgRequire all granted
/Location第一个节点上修改gmetad.conf vim /etc/ganglia/gmetad.conf修改data_source属性的值 data_source flume_cluster hadoop01三个节点上修改gmond.conf vim /etc/ganglia/gmond.conf修改cluster中的属性值 cluster {name flume_clusterowner unspecifiedlatlong unspecifiedurl unspecified
}修改udp_send_channel中的属性值 udp_send_channel {#bind_hostname yes # Highly recommended, soon to be default.# This option tells gmond to use a source address# that resolves to the machines hostname. Without# this, the metrics may appear to come from any# interface and the DNS names associated with# those IPs will be used to create the RRDs.# mcast_join 239.2.11.71# 将监控的信息发送到指定的节点收集host hadoop01port 8649ttl 1
}修改udp_recv_channel中的属性值 udp_recv_channel {# mcast_join 239.2.11.71port 8649# 接收任意主机的连接bind 0.0.0.0retry_bind true# Size of the UDP buffer. If you are handling lots of metrics you really# should bump it up to e.g. 10MB or even higher.# buffer 10485760
}三个节点启动gmond服务 systemctl start gmond
# 查看进程是否启动
ps -ef | grep -i gmond在一个节点上启动gmetad和httpd服务 systemctl start gmetad
systemctl start httpd在浏览器中输入http://IP/ganglia/
监控Flume 修改Flume的配置 # 进入Flume的配置目录
cd /opt/software/flume-1.11.0/conf/
# 复制文件
cp flume-env.sh.template flume-env.sh
# 编辑文件
vim flume-env.sh
# 在文件尾部添加
export JAVA_HOME/opt/software/jdk1.8
export JAVA_OPTS-Dflume.monitoring.typeganglia -Dflume.root.monitoring.hostshadoop01:8649 -Xms100m -Xmx200m
# 保存退出生效
source flume-env.sh启动Flume cd ../data//type ganglia type和之间有空格会报错//开启监控后在执行之间的格式文件就要用这个格式的指令
flume-ng agent -n a1 -c $FLUME_HOME/conf -f basic.properties -Dflume.root.loggerINFO,console -Dflume.monitoring.typeganglia -Dflume.monitoring.hostshadoop01:8649属性解释 属性解释ChannelCapacityChannel的容量ChannelFillPercentageChannel的利用率ChannelSizeChannel的大小EventPutAttemptCountPutList向Channel尝试推送数据的次数EventPutSuccessCountPutList向Channel推送数据成功的次数EventTakeAttemptCountTakeList向Sink推送数据的次数EventTakeSuccessCountTakeList向Sink推送数据成功的次数StartTime起始时间StopTime结束时间