怎样做网站分流赚钱,长沙sem推广,电子商务网站建设与管理试卷6,1w粉丝接广告多少钱目录
一、Flink应用分析
1.1 Flink任务生命周期
1.2 Flink应用告警视角分析
二、监控告警方案说明
2.1 监控消息队中间件消费者偏移量
2.2 通过调度系统监控Flink任务运行状态
2.3 引入开源服的SDK工具实现
2.4 调用FlinkRestApi实现任务监控告警
2.5 定时去查询目标库…目录
一、Flink应用分析
1.1 Flink任务生命周期
1.2 Flink应用告警视角分析
二、监控告警方案说明
2.1 监控消息队中间件消费者偏移量
2.2 通过调度系统监控Flink任务运行状态
2.3 引入开源服的SDK工具实现
2.4 调用FlinkRestApi实现任务监控告警
2.5 定时去查询目标库最大时间和当前时间做对比
2.6 自定义指标Reporter的SDK
2.7 任务日志告警
2.8 运行任务探活
三、总结 前言:Flink作为一个高性能实时计算引擎可灵活的嵌入各种场景许多团队为了实现业务交付选择了Flink作为解决方案但是随着Flink应用的增多且出现线上事故对Flink任务异常的监控告警成为迫切需求但是如何实现Flink任务异常监控告警成为了新的问题本文将从多个角度讲述Flink任务监控告警实现方案。 一、Flink应用分析 告警可以从多个角度实现我们先分析Flink任务运行的生命周期然后拆解每个部分分析可以从那些角度去监控Flink任务的异常。
1.1 Flink任务生命周期
按读取数据源有如Kafka、RocketMq、Pulsar等消息队列还有其他数据源区别在是否有记录消费者信息的数据标识
Flink的运行模式session、per-job、application三类运行模式可以分为两类场景单独运行的任务per和application还有Flink集群统一提供资源运行的任务session
任务场景离线任务还是实时任务
Flink任务应用结构图如下 1.2 Flink应用告警视角分析
从数据源头
1.对于消息队列这种本身拥有记录消费者偏移量概念的中间件可以通过监控消费者偏移量的变化来监控Flink任务运行的异常情况
从任务运行时
2.任务层可以通过调度系统的告警插件监控任务运行结果和任务运行状态而监控任务
3.也可以在Flink任务内部引入开源SDK配置开源工具实现
4.或者调用FlinkRestApi实现任务监控告警
从输出结果上
5.可以定时去查询输出结果最后的时间
6.或者在Flink任务里引入Flink的指标SDK自定义Flink任务的指标采集将结果测流输出到目标端自定义监控告警和分析
其他的方式
7.日志告警捕捉运行日志通过关键词监控告警
8.运行任务定时探活
二、监控告警方案说明 钉钉、微信、邮件、电话、http等属于告警方式的选择这里侧重讲对于运行异常事件信息的捕捉。
2.1 监控消息队中间件消费者偏移量 类似Kafka或者RocketMQ这类拥有记录消费者消费队列信息的中间件可以通过服务自身的RestAPI定时计算消费者消费数据lag条数
以下是Kafka消费者告警配置页面 这需要后端自定义实现;
实现方式如下定时通过调用Kafka自己提供的RestApi将Topic和各消费者同步到Mysql,然后配置要监控Topic的消费者告警阈值和告警人员每隔一分钟定时计算该消费者的lag如果Flink任务出现异常本身不提交offset了数据积压量大于阈值就告警。
2.2 通过调度系统监控Flink任务运行状态
市场上有一些任务调度系统比如dolphinscheduler、StreamX等除了提供任务发布的能力还自带监控告警功能通过使用这类产品也能做到监控告警能力。
比如dolphinscheduler
Flink任务发布功能: 告警功能插件 比如StreamPark:
Flink任务发布能力 告警功能插件 2.3 引入开源服务的SDK工具实现 博客上对于Flink监控告警推荐最多的一种方式就是prometheus pushgateway grafana这套方案这套方案需要安装维护prometheus和grafana这两个产品比较重但是这套方案除了可以做到任务监控还可以做到任务指标级的分析这对于后续的任务性能优化有比较好的支持。
具体操作步骤如下
1.安装好prometheus pushgateway这两个服务
2.在Flink代码里加入以下依赖 !-- Prometheus Metrics Reporter --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-metrics-prometheus/artifactIdversion${flink-version}/version/dependency
3.在部署Flink的配置文件里
将flink-metrics-prometheus-1.14.3.jar 包放入到flink安装目录/lib下
修改flink-conf.yaml配置文件设置属性如下 Example configuration: metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter metrics.reporter.promgateway.host: localhost metrics.reporter.promgateway.port: 9091 metrics.reporter.promgateway.jobName: myJob metrics.reporter.promgateway.randomJobNameSuffix: true metrics.reporter.promgateway.deleteOnShutdown: false metrics.reporter.promgateway.groupingKey: k1v1;k2v2 metrics.reporter.promgateway.interval: 60 SECONDS 然后启动运行任务指标数据就自动推送到pushgateway里了prometheus会从CC里拉取数据到自己的服务里如下 在grafana里导入prometheus源配置指标就可以看到各种指标的运行状态 总结这种方案需要四个步骤
1.启动prometheuspushgatewaygrafana服务
2.配置Flink安装目录的配置文件、导入prometheus的lib包
3.然后在Flink任务里引入一个prometheus的SDK一起打包启动指标就可以在prometheus看到
4.通过grafana做分析看板和配置告警规则驱动事件告警 这种方式都是开源服务功能但是需要维护和理解成本对于一些轻业务团队有负担但是对有很多Flink任务的团队这是一种可用的方案后续还可以基于历史指标分析做到内存级的性能优化
2.4 调用FlinkRestApi实现任务监控告警 这里要搞清楚Flink集群的生命和Flink任务的生命周期这两个概念Flink集群按生命周期来分运行方式可以分为session模式和其他模式两种这两种的区别分别是Flink集群和Flink任务的资源是否一起释放这关系到是否可以稳定的通过FlinkRestApi捕捉到任务运行状态 对于Flink Sesion集群Flink任务可以反复提交集群的URL是不会变的可以通过固定的URL监控到Flink任务的运行状态 对于per-job和application运行方式Flink任务web的URL是不固定的需要每次都捕捉到启动时的Url才能通过url调用RestAPI返回查询指标
sesion集群样式 per-job和application运行模式提交的任务只会有一个任务且url是随机的。
2.5 定时去查询目标库最大时间和当前时间做对比 这种方式是公司的DB团队给我的想法并且他们最初也是这么做的虽然操作上不美观无法大面积且性能上会造成一些影响但确实可以轻量级的实现对任务异常的监控
具体是怎么做的呢 对于实时任务数据都是实时捕捉的写入目标库的时候数据带有当前时间字段业务理想状态下数据会一直产生查询目标库时间最大的数据与当前时间匹配超出阈值时间范围就告警不理想状态将特殊时间段监控去掉就行这种方式在生成业务种确实能满足任务的异常监控告警需求。 要查询最大时间的数据可以使用如下的 SQL 语句 SELECT time_column FROM table_name ORDER BY time_column DESC LIMIT 1;
2.6 自定义指标Reporter的SDK
1.引入Flink自带的指标SDK
!-- Prometheus Metrics Reporter --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-metrics-prometheus/artifactIdversion${flink-version}/version/dependency
2.类似prometheus将指标类的一些参数自定义捕捉写到目标库将推送到pushgateway改成推送到Kafka然后通过目标库的数据自己做任务异常监控分析 这种方式就是避免了开源维护的成本可以使用产品线自研的一套UI和采集中间件做数据管理减轻了维护成本。
大致步骤是
1.自定义 ReporterFactory 实现 MetricReporterFactory 接口中的 createMetricReporter 方法。
2.自定义 Reporter 继承 AbstractReporter 实现 Scheduled 接口中的相关方法
3.在 META-INF/services 下的配置文件中添加对应的实现类然后在Flink配置里自定义参数。
以写入Kafka为例
实现KafkaReporterFactory
package org.apache.flink.metrics.kafka;import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.MetricReporterFactory;import java.util.Properties;/*** Description:* author:i7Yang* create 2024-01-26 20:19**/
public class KafkaReporterFactory implements MetricReporterFactory {Overridepublic MetricReporter createMetricReporter(Properties properties) {return new KafkaReporter();}
}实现自定义KafkaReporter
package org.apache.flink.metrics.kafka;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.AbstractReporter;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.Scheduled;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.*;
import java.util.stream.Collectors;/*** {link MetricReporter} that exports {link Metric Metrics} via Kafka.*/public class KafkaReporter extends AbstractReporter implements Scheduled {private static final Logger LOGGER LoggerFactory.getLogger(KafkaReporter.class);static final String JOB_ID_VARIABLE job_id;static final String JOB_NAME_VARIABLE job_name;private KafkaProducerString, String kafkaProducer;private ListString metricsFilter new ArrayList();private String topic;private String jobName;private String jobId;Overridepublic void open(MetricConfig metricConfig) {String bootstrapServer metricConfig.getString(bootstrapServers, master:9092,storm1:9092,storm2:9092);String filter metricConfig.getString(filter, );String chunkSize metricConfig.getString(chunkSize, 5);String topic metricConfig.getString(topic, flink_metric);Properties properties new Properties();properties.setProperty(bootstrap.servers, bootstrapServer);properties.setProperty(acks, all);properties.setProperty(key.serializer, org.apache.kafka.common.serialization.StringSerializer);properties.setProperty(value.serializer, org.apache.kafka.common.serialization.StringSerializer);ClassLoader classLoader Thread.currentThread().getContextClassLoader();Thread.currentThread().setContextClassLoader(null);kafkaProducer new KafkaProducer(properties);Thread.currentThread().setContextClassLoader(classLoader);if (StringUtils.isNotEmpty(filter)) {this.metricsFilter.addAll(Arrays.asList(filter.split(,)));}this.chunkSize Integer.parseInt(chunkSize);this.topic topic;// 获取任务的 jobNamethis.jobName metricConfig.getString(FLINK_JOB_NAME, null);LOGGER.info(job name: {}, jobName);}Overridepublic void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {MapString, String allVariables group.getAllVariables();String jobID allVariables.get(JOB_ID_VARIABLE);if (jobID ! null this.jobId null) {this.jobId jobID;}String jobName allVariables.get(JOB_NAME_VARIABLE);if (jobName ! null this.jobName null) {this.jobName jobName;}LOGGER.info(job id: {}, job name: {}, this.jobId, this.jobName);LOGGER.info(metric group name: {}, metric name: {}, group.getAllVariables(), metricName);// 只有在 filter 里面的 metric 才会被添加super.notifyOfAddedMetric(metric, metricName, group);}Overridepublic void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {super.notifyOfRemovedMetric(metric, metricName, group);}Overridepublic void close() {if (kafkaProducer ! null) {kafkaProducer.close();}}Overridepublic void report() {synchronized (this) {tryReport();}}private void tryReport() {MapString, Object metricMap new HashMap();metricMap.put(jobId, this.jobId);metricMap.put(jobName, this.jobName);JSONArray jsonArray new JSONArray();gauges.forEach((gauge, metricName) - {JSONObject jsonObject new JSONObject();jsonObject.put(metricName, metricName);jsonObject.put(value, gauge.getValue());jsonObject.put(type, Gauge);jsonArray.add(jsonObject);});counters.forEach((counter, metricName) - {JSONObject jsonObject new JSONObject();jsonObject.put(metricName, metricName);jsonObject.put(value, counter.getCount());jsonObject.put(type, Counter);jsonArray.add(jsonObject);});histograms.forEach((histogram, metricName) - {JSONObject jsonObject new JSONObject();jsonObject.put(metricName, metricName);jsonObject.put(value, histogram.getCount());jsonObject.put(type, Histogram);jsonArray.add(jsonObject);});meters.forEach((meter, metricName) - {JSONObject jsonObject new JSONObject();jsonObject.put(metricName, metricName);jsonObject.put(value, meter.getCount());jsonObject.put(type, Meter);jsonArray.add(jsonObject);});metricMap.put(metrics, jsonArray);ProducerRecordString, String record new ProducerRecord(this.topic, this.jobId, JSONObject.toJSONString(metricMap));kafkaProducer.send(record);}Overridepublic String filterCharacters(String input) {return input;}
}flink 的配置文件中设置一下 kafka reporter metrics.reporter.kafka.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory metrics.reporter.kafka.bootstrapServers: master:9092,storm1:9092,storm2:9092 metrics.reporter.kafka.topic: flink_metric metrics.reporter.kafka.filter: inPoolUsage,outPoolUsage,numberOfCompletedCheckpoints,lastCheckpointFullSize,numBytesOutPerSecond,numBuffersOutPerSecond,numRecordsInPerSecond metrics.reporter.kafka.interval: 20 SECONDS 2.7 任务日志告警 将Flink的运行任务集中采集文件日志用LogStagsh,指标日志可在应用里埋点然后通过日志做告警管理。
2.8 运行任务探活 上面2.4节讲了Flink的sesion运行模式可以通过FlinkRestApi获取运行状态和指标但是对于per-job和applicaiton运行方式任务异常失败后restApi是不存在但是对于其使用的资源管理器可以捕捉到任务运行状态比如yarn可以通过shell查询到任务的存活情况可以定时去探活或获取url获取运行时指标。
使用yarn做Flink任务资源管理的命令 定时监控flink任务状态 yarn application -list | grep -w flink任务名 字 | awk {print $1} 返回flink任务url链接 yarn application -list | grep -w flink 任务名字 | awk {print $10} 三、总结 Flink任务告警方式的选择要从任务的使用情况和期盼来考量简单的使用且任务少可以用监控目标数据库的数据写入情况、per-job和application运行任务探活、Sesion运行方式通过RestApi来告警特定场景的业务可以靠监控存储中间偏移量来告警通用大规模应用场景可以通过采集运行时日志、使用调度平台使用调度平台、引入开源SDK方式、自定义SDK写入通用系统通用系统里方式选择。