做设计在哪个网站找图片,网站策划薪资,网站建设商务通什么意思,上海外贸瓦屑包装袋有限公司Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置如何处理更新结果、时态表、流上的join、流上的确定性以及查询配置 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例1 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例2 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例3 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例4 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例6 17、Flink 之Table API: Table API 支持的操作1 17、Flink 之Table API: Table API 支持的操作2 18、Flink的SQL 支持的操作和语法 19、Flink 的Table API 和 SQL 中的内置函数及示例1 19、Flink 的Table API 和 SQL 中的自定义函数及示例2 19、Flink 的Table API 和 SQL 中的自定义函数及示例3 19、Flink 的Table API 和 SQL 中的自定义函数及示例4 20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL可以直接提交 SQL 任务到集群上 21、Flink 的table API与DataStream API 集成1- 介绍及入门示例、集成说明 21、Flink 的table API与DataStream API 集成2- 批处理模式和inser-only流处理 21、Flink 的table API与DataStream API 集成3- changelog流处理、管道示例、类型转换和老版本转换示例 21、Flink 的table API与DataStream API 集成完整版 22、Flink 的table api与sql之创建表的DDL 24、Flink 的table api与sql之Catalogs介绍、类型、java api和sql实现ddl、java api和sql操作catalog-1 24、Flink 的table api与sql之Catalogsjava api操作数据库、表-2 24、Flink 的table api与sql之Catalogsjava api操作视图-3 24、Flink 的table api与sql之Catalogsjava api操作分区与函数-4 25、Flink 的table api与sql之函数(自定义函数示例) 26、Flink 的SQL之概览与入门示例 27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例1 27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例2 27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例3 27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例4 27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例5 27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例6 27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例7 28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句 29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE1 29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE2 30、Flink SQL之SQL 客户端通过kafka和filesystem的例子介绍了配置文件使用-表、视图等 31、Flink的SQL Gateway介绍及示例 32、Flink table api和SQL 之用户自定义 Sources Sinks实现及详细示例 33、Flink 的Table API 和 SQL 中的时区 35、Flink 的 Formats 之CSV 和 JSON Format 36、Flink 的 Formats 之Parquet 和 Orc Format 41、Flink之Hive 方言介绍及详细示例 40、Flink 的Apache Kafka connectorkafka source的介绍及使用示例-1 40、Flink 的Apache Kafka connectorkafka sink的介绍及使用示例-2 40、Flink 的Apache Kafka connectorkafka source 和sink 说明及使用示例 完整版 42、Flink 的table api与sql之Hive Catalog 43、Flink之Hive 读写及详细验证示例 44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的 45、Flink 的指标体系介绍及验证1-指标类型及指标实现示例 45、Flink 的指标体系介绍及验证2-指标的scope、报告、系统指标以及追踪、api集成示例和dashboard集成 45、Flink 的指标体系介绍及验证3- 完整版 46、Flink 的table api与sql之配项列表及示例 文章目录 Flink 系列文章一、Flink 指标体系1、Registering metrics 注册指标1、指标类型2、计数器3、Gauge4、Histogram5、Meter 本文简单的介绍了Flink 的指标体系的第一部分即指标类型以及四种类型的代码实现示例。 本专题分为三部分即 45、Flink 的指标体系介绍及验证1-指标类型及指标实现示例 45、Flink 的指标体系介绍及验证2-指标的scope、报告、系统指标以及追踪、api集成示例和dashboard集成 45、Flink 的指标体系介绍及验证3- 完整版
本文依赖nc能正常使用。 本文分为5个部分即指标分类、计数器、gauge、histogram和meter四个指标的代码实现。 本文的示例是在Flink 1.17版本中运行。
一、Flink 指标体系
Flink暴露了一个度量系统允许收集度量并将其公开给外部系统。 本文涉及的maven依赖 propertiesencodingUTF-8/encodingproject.build.sourceEncodingUTF-8/project.build.sourceEncodingmaven.compiler.source1.8/maven.compiler.sourcemaven.compiler.target1.8/maven.compiler.targetjava.version1.8/java.versionscala.version2.12/scala.versionflink.version1.17.0/flink.version/propertiesdependencies!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-csv/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-json/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!-- flink连接器 --!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version/dependency/dependencies1、Registering metrics 注册指标
通过调用getRuntimeContext().getMetricGroup()您可以从任何扩展RichFunction的用户函数访问度量系统。此方法返回一个MetricGroup对象您可以在该对象上创建和注册新度量。
1、指标类型
Flink支持计数器、仪表盘、柱状图和计量表。Counters, Gauges, Histograms and Meters.
2、计数器
计数器是用来统计数量的。当前值可以是in-或使用 inc()/inc(long n)或dec()/dec(long n)增减。您可以通过调用MetricGroup上的 counter(String name)来创建和注册计数器。 本示例提供了多种实现方式供参考。
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** author alanchan**/
public class TestMetricsDemo {// public class LineMapper extends RichMapFunctionString, String {
// private transient Counter counter;
//
// Override
// public void open(Configuration config) {
// this.counter getRuntimeContext().getMetricGroup().counter(result2LineCounter);
// }
//
// Override
// public String map(String value) throws Exception {
// this.counter.inc();
// return value;
// }
// }public static void test1() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStreamString lines env.socketTextStream(192.168.10.42, 9999);// transformationDataStreamTuple2String, Integer result lines.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String value, CollectorString out) throws Exception {String[] arr value.split(,);for (String word : arr) {out.collect(word);}}}).map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {return Tuple2.of(value, 1);}}).keyBy(t - t.f0).sum(1);// SingleOutputStreamOperatorTuple2Integer, Integer result1 lines.map(new RichMapFunctionString, Tuple2Integer, Integer() {
//
// Override
// public Tuple2Integer, Integer map(String value) throws Exception {
// int subTaskId getRuntimeContext().getIndexOfThisSubtask();// 子任务id/分区编号
// return new Tuple2(subTaskId, 1);
// }
// // 按照子任务id/分区编号分组,并统计每个子任务/分区中有几个元素
// }).keyBy(t - t.f0).sum(1);// RichFlatMapFunctionIN, OUT// Tuple3String, Long, Integer 输入的字符串行数统计单词的总数DataStreamTuple3String, Long, Integer result2 lines.flatMap(new RichFlatMapFunctionString, Tuple2String, Long() {
// private transient Counter counter;private long result2LineCounter 0;Overridepublic void open(Configuration config) {
// this.counter getRuntimeContext().getMetricGroup().counter(result2LineCounter:);result2LineCounter getRuntimeContext().getMetricGroup().counter(result2LineCounter:).getCount();}Overridepublic void flatMap(String value, CollectorTuple2String, Long out) throws Exception {
// this.counter.inc();result2LineCounter;System.out.println(计数器行数 result2LineCounter);String[] arr value.split(,);for (String word : arr) {out.collect(Tuple2.of(word, result2LineCounter));}}}).map(new MapFunctionTuple2String, Long, Tuple3String, Long, Integer() {Overridepublic Tuple3String, Long, Integer map(Tuple2String, Long value) throws Exception {
// Tuple3String, Long, Integer t Tuple3.of(value.f0, value.f1, 1);return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t - t.f0).sum(2);// sinkresult.print(result:);result2.print(result2:);env.execute();}public static void main(String[] args) throws Exception {test1();
// StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(1);
// DataStreamString input env.fromElements(a, b, c, a, b, c);
//
// input.keyBy(value - value).map(new RichMapFunctionString, String() {
// private long count 0;
//
// Override
// public void open(Configuration parameters) throws Exception {super.open(parameters);
// count getRuntimeContext().getMetricGroup().counter(myCounter).getCount();
// }
//
// Override
// public String map(String value) throws Exception {
// count;
// return value : count;
// }
// }).print();
//
// env.execute(Flink Count Counter Example);}}
///验证数据///
// 输入数据
[alanchanserver2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出
计数器行数1
result: (hello,1)
result2: (hello,1,1)
result: (123,1)
result2: (123,1,1)
计数器行数2
result2: (alan,2,1)
result: (alan,1)
result2: (flink,2,1)
result: (flink,1)
result2: (good,2,1)
result: (good,1)
计数器行数3
result: (alan_chan,1)
result2: (alan_chan,3,1)
result: (hi,1)
result2: (hi,3,1)
result: (flink,2)
result2: (flink,2,2)或者您也可以使用自己的Counter实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** author alanchan**/
public class TestMetricsDemo {public static void test2() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStreamString lines env.socketTextStream(192.168.10.42, 9999);// transformation// Tuple3String, Long, Integer 输入的字符串行数统计单词的总数DataStreamTuple3String, Long, Integer result lines.flatMap(new RichFlatMapFunctionString, Tuple2String, Long() {private transient Counter counter;Overridepublic void open(Configuration config) {this.counter getRuntimeContext().getMetricGroup().counter(result2LineCounter, new AlanCustomCounter());}Overridepublic void flatMap(String value, CollectorTuple2String, Long out) throws Exception {this.counter.inc();
// result2LineCounter;System.out.println(计数器行数 this.counter.getCount());String[] arr value.split(,);for (String word : arr) {out.collect(Tuple2.of(word, this.counter.getCount()));}}}).map(new MapFunctionTuple2String, Long, Tuple3String, Long, Integer() {Overridepublic Tuple3String, Long, Integer map(Tuple2String, Long value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t - t.f0).sum(2);// sinkresult.print(result:);env.execute();}public static class AlanCustomCounter implements Counter {private long count;Overridepublic void inc() {count 2;}Overridepublic void inc(long n) {count n;}Overridepublic void dec() {count - 2;}Overridepublic void dec(long n) {count - n;}Overridepublic long getCount() {return count;}}public static void main(String[] args) throws Exception {test2();}}///验证数据///
// 输入数据
[alanchanserver2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出
计数器行数2
result: (hello,2,1)
result: (123,2,1)
计数器行数4
result: (alan,4,1)
result: (flink,4,1)
result: (good,4,1)
计数器行数6
result: (alan_chan,6,1)
result: (hi,6,1)
result: (flink,4,2)3、Gauge
仪表可根据需要提供任何类型的值。为了使用Gauge您必须首先创建一个实现org.apache.flink.metrics.Guge接口的类。返回值的类型没有限制。您可以通过调用MetricGroup上的gauge(String name, Gauge gauge) 来注册gauge。
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** author alanchan**/
public class TestMetricsGaugeDemo {
// public class MyMapper extends RichMapFunctionString, String {
// private transient int valueToExpose 0;
//
// Override
// public void open(Configuration config) {
// getRuntimeContext().getMetricGroup().gauge(MyGauge, new GaugeInteger() {
// Override
// public Integer getValue() {
// return valueToExpose;
// }
// });
// }
//
// Override
// public String map(String value) throws Exception {
// valueToExpose;
// return value;
// }
// }public static void test1() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStreamString lines env.socketTextStream(192.168.10.42, 9999);// transformation// RichFlatMapFunctionIN, OUT// Tuple3String, String, Integer 输入的字符串alan lines[行数]统计单词的总数DataStreamTuple3String, String, Integer result lines.flatMap(new RichFlatMapFunctionString, Tuple2String, String() {private long result2LineCounter 0;private GaugeString gauge null;Overridepublic void open(Configuration config) {result2LineCounter getRuntimeContext().getMetricGroup().counter(resultLineCounter:).getCount();gauge getRuntimeContext().getMetricGroup().gauge(alanGauge, new GaugeString() {Overridepublic String getValue() {return alan lines[ result2LineCounter ];}});}Overridepublic void flatMap(String value, CollectorTuple2String, String out) throws Exception {result2LineCounter;System.out.println(计数器行数 result2LineCounter);String[] arr value.split(,);for (String word : arr) {out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(new MapFunctionTuple2String, String, Tuple3String, String, Integer() {Overridepublic Tuple3String, String, Integer map(Tuple2String, String value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t - t.f0).sum(2);// sinkresult.print(result:);env.execute();}public static void main(String[] args) throws Exception {test1();}}///验证数据///
// 输入数据
[alanchanserver2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出
计数器行数1
result: (hello,alan lines[1],1)
result: (123,alan lines[1],1)
计数器行数2
result: (alan,alan lines[2],1)
result: (flink,alan lines[2],1)
result: (good,alan lines[2],1)
计数器行数3
result: (alan_chan,alan lines[3],1)
result: (hi,alan lines[3],1)
result: (flink,alan lines[2],2)
报告器会将暴露的对象转换为String这意味着需要一个有意义的toString()实现。
4、Histogram
直方图测量长值的分布。您可以通过调用MetricGroup上的histogram(String name, Histogram histogram) 来注册一个对象。 下面的示例是自己实现的Histogram接口仅仅用于演示实现过程。
import java.io.Serializable;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
//import com.codahale.metrics.Histogram;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** author alanchan**/
public class TestMetricsHistogramDemo {// public class MyMapper extends RichMapFunctionLong, Long {
// private transient Histogram histogram;
//
// Override
// public void open(Configuration config) {
// this.histogram getRuntimeContext().getMetricGroup().histogram(alanHistogram, new AlanHistogram());
// }
//
// Override
// public Long map(Long value) throws Exception {
// this.histogram.update(value);
// return value;
// }
// }public static class AlanHistogram implements Histogram {private CircularDoubleArray descriptiveStatistics new CircularDoubleArray(10);;public AlanHistogram() {}public AlanHistogram(int windowSize) {this.descriptiveStatistics new CircularDoubleArray(windowSize);}Overridepublic void update(long value) {this.descriptiveStatistics.addValue(value);}Overridepublic long getCount() {return this.descriptiveStatistics.getElementsSeen();}Overridepublic HistogramStatistics getStatistics() {
// return new DescriptiveStatisticsHistogramStatistics(this.descriptiveStatistics);return null;}class CircularDoubleArray implements Serializable {private static final long serialVersionUID 1L;private final double[] backingArray;private int nextPos 0;private boolean fullSize false;private long elementsSeen 0;CircularDoubleArray(int windowSize) {this.backingArray new double[windowSize];}synchronized void addValue(double value) {backingArray[nextPos] value;elementsSeen;nextPos;if (nextPos backingArray.length) {nextPos 0;fullSize true;}}synchronized double[] toUnsortedArray() {final int size getSize();double[] result new double[size];System.arraycopy(backingArray, 0, result, 0, result.length);return result;}private synchronized int getSize() {return fullSize ? backingArray.length : nextPos;}private synchronized long getElementsSeen() {return elementsSeen;}}}public static void test1() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStreamString lines env.socketTextStream(192.168.10.42, 9999);// transformation// RichFlatMapFunctionIN, OUT// Tuple3String, String, Integer 输入的字符串alan lines[行数]统计单词的总数DataStreamTuple3String, String, Integer result lines.flatMap(new RichFlatMapFunctionString, Tuple2String, String() {private long result2LineCounter 0;private GaugeString gauge null;private Histogram histogram null;;Overridepublic void open(Configuration config) {result2LineCounter getRuntimeContext().getMetricGroup().counter(resultLineCounter:).getCount();gauge getRuntimeContext().getMetricGroup().gauge(alanGauge, new GaugeString() {Overridepublic String getValue() {return alan lines[ result2LineCounter ];}});this.histogram getRuntimeContext().getMetricGroup().histogram(alanHistogram, new AlanHistogram());}Overridepublic void flatMap(String value, CollectorTuple2String, String out) throws Exception {result2LineCounter;this.histogram.update(result2LineCounter * 3);// 此处仅仅示例this.histogram.getCount()的值没有实际的意义System.out.println(计数器行数 result2LineCounter histogram: this.histogram.getCount());String[] arr value.split(,);for (String word : arr) {out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(new MapFunctionTuple2String, String, Tuple3String, String, Integer() {Overridepublic Tuple3String, String, Integer map(Tuple2String, String value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t - t.f0).sum(2);// sinkresult.print(result:);env.execute();}public static void main(String[] args) throws Exception {test1();}}///验证数据///
// 输入数据
[alanchanserver2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出
计数器行数1 histogram:1
result: (hello,alan lines[1],1)
result: (123,alan lines[1],1)
计数器行数2 histogram:2
result: (alan,alan lines[2],1)
result: (flink,alan lines[2],1)
result: (good,alan lines[2],1)
计数器行数3 histogram:3
result: (alan_chan,alan lines[3],1)
result: (hi,alan lines[3],1)
result: (flink,alan lines[2],2)
Flink没有提供直方图的默认实现但提供了一个允许使用Codahale/DropWizard直方图的包装器。要使用此包装器 在pom.xml中添加以下依赖项
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-metrics-dropwizard/artifactIdversion1.17.1/version
/dependency下面的示例是使用 Codahale/DropWizard直方图如下所示
import java.io.Serializable;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.metrics.Gauge;
//import com.codahale.metrics.Histogram;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import com.codahale.metrics.SlidingWindowReservoir;/*** author alanchan**/
public class TestMetricsHistogramDemo {public static void test2() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStreamString lines env.socketTextStream(192.168.10.42, 9999);// transformation// RichFlatMapFunctionIN, OUT// Tuple3String, String, Integer 输入的字符串alan lines[行数]统计单词的总数DataStreamTuple3String, String, Integer result lines.flatMap(new RichFlatMapFunctionString, Tuple2String, String() {private long result2LineCounter 0;private GaugeString gauge null;private Histogram histogram null;;Overridepublic void open(Configuration config) {result2LineCounter getRuntimeContext().getMetricGroup().counter(resultLineCounter:).getCount();gauge getRuntimeContext().getMetricGroup().gauge(alanGauge, new GaugeString() {Overridepublic String getValue() {return alan lines[ result2LineCounter ];}});com.codahale.metrics.Histogram dropwizardHistogram new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
// this.histogram getRuntimeContext().getMetricGroup().histogram(alanHistogram, new AlanHistogram());this.histogram getRuntimeContext().getMetricGroup().histogram(alanHistogram, new DropwizardHistogramWrapper(dropwizardHistogram));}Overridepublic void flatMap(String value, CollectorTuple2String, String out) throws Exception {result2LineCounter;this.histogram.update(result2LineCounter * 3);// 此处仅仅示例this.histogram.getCount()的值没有实际的意义System.out.println(计数器行数 result2LineCounter histogram: this.histogram.getCount());String[] arr value.split(,);for (String word : arr) {out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(new MapFunctionTuple2String, String, Tuple3String, String, Integer() {Overridepublic Tuple3String, String, Integer map(Tuple2String, String value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t - t.f0).sum(2);// sinkresult.print(result:);env.execute();}public static void main(String[] args) throws Exception {test2();}}///验证数据///
// 输入数据
[alanchanserver2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出//控制台输出
计数器行数1 histogram:1
result: (hello,alan lines[1],1)
result: (123,alan lines[1],1)
计数器行数2 histogram:2
result: (alan,alan lines[2],1)
result: (flink,alan lines[2],1)
result: (good,alan lines[2],1)
计数器行数3 histogram:3
result: (alan_chan,alan lines[3],1)
result: (hi,alan lines[3],1)
result: (flink,alan lines[2],2)
5、Meter
仪表测量平均吞吐量。可以使用markEvent()方法注册事件的发生。可以使用markEvent(long n)方法注册同时发生多个事件。您可以通过在MetricGroup上调用meter(String name, Meter meter)来注册meter。
下面的示例展示了自定义的Meter实现可能很不严谨实际上应用更多的是本部分的第二个示例。
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;//import com.codahale.metrics.Meter;
import com.codahale.metrics.SlidingWindowReservoir;/*** author alanchan**/
public class TestMetricsMeterDemo {public class MyMapper extends RichMapFunctionLong, Long {private transient Meter meter;Overridepublic void open(Configuration config) {this.meter getRuntimeContext().getMetricGroup().meter(myMeter, new AlanMeter());}Overridepublic Long map(Long value) throws Exception {this.meter.markEvent();return value;}}public static class AlanMeter implements Meter {/** The underlying counter maintaining the count. */private final Counter counter new SimpleCounter();;/** The time-span over which the average is calculated. */private final int timeSpanInSeconds 0;/** Circular array containing the history of values. */private final long[] values null;;/** The index in the array for the current time. */private int time 0;/** The last rate we computed. */private double currentRate 0;Overridepublic void markEvent() {this.counter.inc();}Overridepublic void markEvent(long n) {this.counter.inc(n);}Overridepublic long getCount() {return counter.getCount();}Overridepublic double getRate() {return currentRate;}public void update() {time (time 1) % values.length;values[time] counter.getCount();currentRate ((double) (values[time] - values[(time 1) % values.length]) / timeSpanInSeconds);}}public static void test1() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStreamString lines env.socketTextStream(192.168.10.42, 9999);// transformation// RichFlatMapFunctionIN, OUT// Tuple3String, String, Integer 输入的字符串alan lines[行数]统计单词的总数DataStreamTuple3String, String, Integer result lines.flatMap(new RichFlatMapFunctionString, Tuple2String, String() {private long result2LineCounter 0;private GaugeString gauge null;private Histogram histogram null;private Meter meter;Overridepublic void open(Configuration config) {result2LineCounter getRuntimeContext().getMetricGroup().counter(resultLineCounter:).getCount();gauge getRuntimeContext().getMetricGroup().gauge(alanGauge, new GaugeString() {Overridepublic String getValue() {return alan lines[ result2LineCounter ];}});com.codahale.metrics.Histogram dropwizardHistogram new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));this.histogram getRuntimeContext().getMetricGroup().histogram(alanHistogram, new DropwizardHistogramWrapper(dropwizardHistogram));this.meter getRuntimeContext().getMetricGroup().meter(alanMeter, new AlanMeter());}Overridepublic void flatMap(String value, CollectorTuple2String, String out) throws Exception {result2LineCounter;this.histogram.update(result2LineCounter * 3);this.meter.markEvent();// 此处仅仅示例this.histogram.getCount()、this.meter.getRate()的值没有实际的意义具体使用以实际使用场景为准System.out.println(计数器行数 result2LineCounter , histogram: this.histogram.getCount() , meter.getRate: this.meter.getRate());String[] arr value.split(,);for (String word : arr) {out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(new MapFunctionTuple2String, String, Tuple3String, String, Integer() {Overridepublic Tuple3String, String, Integer map(Tuple2String, String value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t - t.f0).sum(2);// sinkresult.print(result:);env.execute();}public static void main(String[] args) throws Exception {test1();}}///验证数据///
// 输入数据
[alanchanserver2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出
计数器行数1, histogram:1, meter.getRate:0.0
result: (hello,alan lines[1],1)
result: (123,alan lines[1],1)
计数器行数2, histogram:2, meter.getRate:0.0
result: (alan,alan lines[2],1)
result: (flink,alan lines[2],1)
result: (good,alan lines[2],1)
计数器行数3, histogram:3, meter.getRate:0.0
result: (alan_chan,alan lines[3],1)
result: (hi,alan lines[3],1)
result: (flink,alan lines[2],2)
Flink提供了一个允许使用Codahale/DropWizard仪表的包装器。要使用此包装器 在pom.xml中添加以下依赖项
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-metrics-dropwizard/artifactIdversion1.17.1/version
/dependency下面使用Codahale/DropWizard注册的示例如下所示
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;//import com.codahale.metrics.Meter;
import com.codahale.metrics.SlidingWindowReservoir;/*** author alanchan**/
public class TestMetricsMeterDemo {public static void test2() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStreamString lines env.socketTextStream(192.168.10.42, 9999);// transformation// RichFlatMapFunctionIN, OUT// Tuple3String, String, Integer 输入的字符串alan lines[行数]统计单词的总数DataStreamTuple3String, String, Integer result lines.flatMap(new RichFlatMapFunctionString, Tuple2String, String() {private long result2LineCounter 0;private GaugeString gauge null;private Histogram histogram null;private Meter meter;Overridepublic void open(Configuration config) {result2LineCounter getRuntimeContext().getMetricGroup().counter(resultLineCounter:).getCount();gauge getRuntimeContext().getMetricGroup().gauge(alanGauge, new GaugeString() {Overridepublic String getValue() {return alan lines[ result2LineCounter ];}});com.codahale.metrics.Histogram dropwizardHistogram new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));this.histogram getRuntimeContext().getMetricGroup().histogram(alanHistogram, new DropwizardHistogramWrapper(dropwizardHistogram));// this.meter getRuntimeContext().getMetricGroup().meter(alanMeter, new AlanMeter());com.codahale.metrics.Meter dropwizardMeter new com.codahale.metrics.Meter();this.meter getRuntimeContext().getMetricGroup().meter(alanMeter, new DropwizardMeterWrapper(dropwizardMeter));}Overridepublic void flatMap(String value, CollectorTuple2String, String out) throws Exception {result2LineCounter;this.histogram.update(result2LineCounter * 3);this.meter.markEvent();// 此处仅仅示例this.histogram.getCount()、this.meter.getRate()的值没有实际的意义具体使用以实际使用场景为准System.out.println(计数器行数 result2LineCounter , histogram: this.histogram.getCount() , meter.getRate: this.meter.getRate());String[] arr value.split(,);for (String word : arr) {out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(new MapFunctionTuple2String, String, Tuple3String, String, Integer() {Overridepublic Tuple3String, String, Integer map(Tuple2String, String value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t - t.f0).sum(2);// sinkresult.print(result:);env.execute();}public static void main(String[] args) throws Exception {test2();}}//控制台输出
计数器行数1, histogram:1, meter.getRate:0.0
result: (hello,alan lines[1],1)
result: (123,alan lines[1],1)
计数器行数2, histogram:2, meter.getRate:0.0
result: (alan,alan lines[2],1)
result: (flink,alan lines[2],1)
result: (good,alan lines[2],1)
计数器行数3, histogram:3, meter.getRate:0.0
result: (alan_chan,alan lines[3],1)
result: (hi,alan lines[3],1)
result: (flink,alan lines[2],2)
以上本文简单的介绍了Flink 的指标体系的第一部分即指标类型以及四种类型的代码实现示例。 本专题分为三部分即 45、Flink 的指标体系介绍及验证1-指标类型及指标实现示例 45、Flink 的指标体系介绍及验证2-指标的scope、报告、系统指标以及追踪、api集成示例和dashboard集成 45、Flink 的指标体系介绍及验证3- 完整版