自助建设网站平台,南京林业大学实验与建设网站,wordpress管理密码忘记,网站建设与管理自简历系列文章目录
上手第一关#xff0c;手把手教你安装kafka与可视化工具kafka-eagle Kafka是什么#xff0c;以及如何使用SpringBoot对接Kafka 架构必备能力——kafka的选型对比及应用场景 Kafka存取原理与实现分析#xff0c;打破面试难关 防止消息丢失与消息重复——Kafka可…系列文章目录
上手第一关手把手教你安装kafka与可视化工具kafka-eagle Kafka是什么以及如何使用SpringBoot对接Kafka 架构必备能力——kafka的选型对比及应用场景 Kafka存取原理与实现分析打破面试难关 防止消息丢失与消息重复——Kafka可靠性分析及优化实践 Kafka的重要组件谈谈流处理引擎Kafka Stream 系列文章目录一、Kafka Stream是什么1. 简介2. 特点 二、流程与核心类1. KStream 和 KTable 概念2. 常用逻辑与转换 三、使用场景与Demo1. 实时数据分析2. 实时预测 四、总结 我们前面介绍了很多kafka本身的特性与设计也说了不少原理性的内容本次我们稍微放松一下来介绍一下 Kafka的一个重要组件—— Kafka Stream 作者简介战斧从事金融IT行业有着多年一线开发、架构经验爱好广泛乐于分享致力于创作更多高质量内容 本文收录于 kafka 专栏有需要者可直接订阅专栏实时获取更新 高质量专栏 云原生、RabbitMQ、Spring全家桶 等仍在更新欢迎指导 Zookeeper Redis dubbo docker netty等诸多框架以及架构与分布式专题即将上线敬请期待 一、Kafka Stream是什么
1. 简介
Kafka Stream是 Apache Kafka 的一个子项目它提供了一种简单而快速的方法来对数据流进行处理是一种无状态的流处理引擎可以消费Kafka中的数据并将其转换为输出流。Kafka Stream不像其他流处理工具它是一个Java库能够快速构建、部署和管理数据流处理任务。 我们在前面的文章中《Kafka是什么以及如何使用SpringBoot对接Kafka》 初步接触了kafka的客户端kafka client,当时如果有眼尖的同学应该也注意到了在使用Spring Initializr创建项目时就看见了Kafka Stream的身影 那么Kafka Stream 与 我们当时接触的 Kafka client 有什么联系吗其实它们的共同点在于他们都是与Kafka集成的API从逻辑层次来说Kafka Stream 是建立在 Kafka client 上的我们在引用 Kafka Stream 时 其会自带着 Kafka client 的包如下 那它们的作用到底哪不一样呢具体来说它们的不同之处可从这几个方面看 功能不同 Kafka Stream是用于流处理任务的API它提供了一种简单而快速的方法来对数据流进行处理。相反Kafka Client主要用于生产和消费Kafka消息。 处理方式不同 Kafka Client主要依赖于订阅和轮询来消费Kafka消息。而Kafka Stream依赖于数据流的处理它会自动将Kafka消息转化为数据流并实时处理这些数据。 API调用方式不同 在Kafka Stream中您需要定义一个拓扑结构描述如何将输入流转换成输出流并执行转换。而在Kafka Client中您需要调用API来发送和接收Kafka消息。 应用场景不同 Kafka Stream适用于实时数据分析、实时预测等需要流处理的场景。而Kafka Client更适用于异步数据传输的场景例如日志收集、事件处理等。
2. 特点
我们前面说过流处理引擎做的人也是很多的常见的比如说Apache Flink、Apache Spark Streaming、Apache Storm 以及阿里参考 Apache Storm 开发的Jstorm。既然有如此多的可选项为什么还有Kafka Stream这个东西呢其实说来也简单就是应用简单功能丰富 总计来说其具备以下特点 无需额外征用集群资源 在传统的流处理中需要单独的集群进行数据处理这就意味着需要额外的开销。而Kafka Stream是直接在Kafka集群上执行的不会征用额外的资源。 易于使用 Kafka Stream提供了简单易用的API使得开发人员可以快速地进行流处理任务的开发。它还支持Java 8中的Lambda表达式使得代码更加简洁。 支持丰富的转换操作 Kafka Stream支持丰富的转换操作包括过滤、映射、聚合等。这些操作可以被组合使用以满足不同的处理需求。
二、流程与核心类
1. KStream 和 KTable 概念
我们上面简要介绍了下Kafka Stream的特点。但是要想明白其流程并正确使用我们还需要讲两个核心概念也就是KStream 和 KTable KStream KStream是一个持续不断的流数据记录每个记录都是一个key-value对可以被读取、写入和转换。通常KStream用于处理实时数据流我们可以直接从kafka集群中指定主题来获取源源不断的数据 KTable KTable顾名思义可以看作是一张持久化的、可查询的、支持状态更新的表格。它通常是利用KStream的数据经过一系列转换和聚合操作生成的KTable可以被读取和更新但不能被删除。
KStream和KTable是互补的。KStream可以转换成KTable也可以从KTable中获取值KTable也可以转换成KStream我们可以使用下图看一下是如何针对数据流中出现的单词进行计数并”落表“的
当然我们还有必要提及一下GlobalKTable它是一种特殊的KTableGlobalKTable通常用于处理比较静态的全局数据例如维护一个全局的用户信息表而且只在应用程序启动时从Kafka主题中加载所有数据这意味着需要消耗较大的内存空间。
2. 常用逻辑与转换
我们上面说了KStream 和 KTable 在代码里其实也对应了两个类那这两个类都有些什么方法呢最重要的我们想知道它们是如何互相转换的。
其实关于 KStream 可能有些同学会想到JDK 里的 Stream 因为确实很多方法是一致的所以不用慌张。我们先来介绍下 KStream 的常用方法
filter过滤数据流中不符合条件的记录。map将每个记录转换为一个新的记录可以改变记录的key和value。flatMap与map类似可以将一个记录转换为多个新的记录。mapValues与map类似但记录的键保留不变只改变值groupByKey将记录按key进行分组生成一个KGroupedStream对象可以用于聚合操作。reduce对KGroupedStream对象进行聚合操作。join将两个KStream对象进行join操作生成一个新的KStream对象。windowed对KStream对象进行窗口操作可以使用时间窗口或大小窗口。aggregate将当前流中的记录聚合并生成一个新的KTable。与reduce方法不同aggregate方法不仅考虑当前记录还考虑之前记录的聚合结果to将结果输出到输出主题中
我们举一个小代码段来看下这些方法的使用
KStreamString, String input ...;
// 使用filter方法过滤出包含important的值
KStreamString, String filtered input.filter((key, value) - value.contains(important))
// 使用mapValues方法将每个值的长度作为新值。
KStreamString, Integer mapped filtered.mapValues(value - value.length());
// 使用groupBy方法将键值对按键分组并使用count方法计算每个键出现的次数将结果存储在KTable中
KTableString, Integer counted mapped.groupBy((key, value) - key).count(Materialized.as(counts));
// 使用selectKey方法选取值中-前的部分作为新键
KStreamString, String rekeyed input.selectKey((key, value) - value.split(-)[0]);
// 使用leftJoin方法将两个KStream进行左连接即mapped流和rekeyed流进行连接
// 连接的条件是两个流中的键相等。连接函数的定义是将两个整型值相加并将结果作为连接后的流的值
KStreamString, Integer joined mapped.leftJoin(rekeyed, (value1, value2) - value1 value2);
// 使用groupByKey方法对键值对按键分组并使用windowedBy方法将窗口大小设置为5分钟
// 然后使用count方法计算每个键在此时间窗口中出现的次数最后使用toStream方法将结果
// 转换为KStream类型并将时间窗口的起止时间设置为键值设置为次数
KStreamString, Long windowed input .groupByKey().windowedBy(TimeWindows.of(Duration.ofMinutes(5))).count().toStream().map((key, value) - new KeyValue(key.key(), value));
// 将结果输出到输出主题中
windowed.to(output-topic);而关于KTable也有一些常用方法如下
filter根据指定的谓词过滤记录并返回一个新的KTable。谓词是一个接受key和value作为参数的函数如果返回true则保留该记录否则过滤掉。mapValues对KTable中的每个value执行指定的转换函数并返回一个新的KTable。groupBy根据指定的key进行分组并返回一个KGroupedTable对象该对象用于执行各种聚合操作。join将当前KTable与另一个KTable或KStream进行连接并返回一个新的KTable。toStream将KTable转换为KStream并返回一个新的KStream对象。
我们也写一小段代码用于演示
// 从输入流中建立一个KTable
StreamsBuilder builder new StreamsBuilder();
KTableString, String myKTable builder.table(input-topic, Materialized.as(ktable-store));// 1. 执行一些过滤操作保留包含特定前缀的键
KTableString, String filteredKTable myKTable.filter((key, value) - key.startsWith(prefix));// 2. 执行mapValues操作将每个键值对中的value进行大写转换
KTableString, String uppercasedKTable myKTable.mapValues(e - e.toUpperCase());// 3. 执行groupBy操作将键值对按照key的前缀分组
KTableString, String groupedKTable myKTable.groupBy((key, value) - KeyValue.pair(key.split(_)[0], value)).reduce((aggValue, newValue) - aggValue _ newValue);// 4. 执行leftJoin操作将两个KTable进行连接如果某一个KTable中没有该key则用null进行填充
KTableString, String leftJoinedKTable myKTable.leftJoin(filteredKTable,(value1, value2) - value1 - value2);// 5. 执行toStream操作将KTable转换为KStream类型
myKTable.toStream().map((key, value) - KeyValue.pair(key, value.toUpperCase()));当然关于上述哪些方法我们也可以用一张图来概括它们之间的转换关系如下图其中的 KGroupedStream 和 KGroupedTable 其实就是KStream 和 KTable 进行聚合操作后的产物
三、使用场景与Demo
1. 实时数据分析
Kafka Stream可以将实时到达的数据进行处理以便进行实时数据分析。在这种情况下Kafka Stream通常会将数据转换为一些有用的信息以便于更好的理解数据我们可以举一个简单的示例demo 假设我们有一个数据流其中包含电影评分信息和电影相关信息。我们的任务是计算出每个电影的平均评分。 首先我们需要定义输入数据流所需的数据结构。假设我们的数据结构如下
Data
public class MovieRating {private String movieId;private float rating;
}Data
public class Movie {private String movieId;private String title;
}接下来我们需要编写Kafka流应用程序。我们可以将其分为三个步骤
1.从Kafka主题读取电影评分和电影相关信息。 2.以电影ID为键将电影评分聚合到一个窗口中并计算平均值。 3.将结果写入新的Kafka主题。
public static void main(String[] args) throws Exception {Properties props new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, movie-ratings-app);props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);final StreamsBuilder builder new StreamsBuilder();// 步骤1从kafka主题中读取电影信息及评分// 我们假设主题包含Avro编码的数据KStreamString, MovieRating ratings builder.stream(movie-ratings);KStreamString, Movie movies builder.stream(movies);// 步骤2: 按电影ID聚合评分并计算平均评分.KTableWindowedString, Double averageRatings ratings.groupBy((key, value) - value.getMovieId()).windowedBy(TimeWindows.of(Duration.ofMinutes(10))).aggregate(() - new RatingAggregate(0.0, 0L),(key, value, aggregate) - new RatingAggregate(aggregate.getSum() value.getRating(), aggregate.getCount() 1),Materialized.with(Serdes.String(), new RatingAggregateSerde())).mapValues((aggregate) - aggregate.getSum() / aggregate.getCount()).toStream().groupByKey().windowedBy(TimeWindows.of(Duration.ofMinutes(10))).reduce((value1, value2) - Math.max(value1, value2),Materialized.with(Serdes.String(), Serdes.Double())).toStream().map((key, value) - new KeyValue(key.key(), value));// 步骤3: 将结果写入一个新的kafka主题.averageRatings.to(average-ratings);final KafkaStreams streams new KafkaStreams(builder.build(), props);streams.start();
}// 用于聚合评分的辅助类
public static class RatingAggregate {private double sum;private long count;public RatingAggregate(double sum, long count) {this.sum sum;this.count count;}public double getSum() {return sum;}public long getCount() {return count;}
}// 序列化与反序列化.
public static class RatingAggregateSerde extends Serdes.WrapperSerdeRatingAggregate {public RatingAggregateSerde() {super(new JsonSerializer(), new JsonDeserializer(RatingAggregate.class));}
}
在上面的代码中我们使用Serdes.String()和SpecificAvroSerde来序列化和反序列化字符串和Avro-encoded对象。我们使用TimeWindows.of(Duration.ofMinutes(10))定义大小为10分钟的窗口。我们使用RatingAggregate类来辅助计算每个电影的平均评分RatingAggregateSerde类来序列化和反序列化RatingAggregate对象
2. 实时预测
Kafka Stream可以用于实时预测任务例如在一些应用中需要根据实时到达的数据来进行预测。Kafka Stream可以使用已有的模型对实时数据进行预测从而实现实时的推荐或预测等功能。 还是拿电影举例我们经常可以看到电影票房的预测我们可以以此写一个Demo public class MovieProcessor {private static final String INPUT_TOPIC box-office-input;private static final String OUTPUT_TOPIC box-office-output;public static void main(String[] args) {// 创建 Kafka Streams 配置Properties props new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, box-office-predictor);props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());// 创建 Kafka StreamsStreamsBuilder builder new StreamsBuilder();KStreamString, String inputStream builder.stream(INPUT_TOPIC);// 将上映日期转换为毫秒数并计算出预测票房KTableLong, Double boxOfficePrediction inputStream.mapValues(new ValueMapperString, Double() {Overridepublic Double apply(String value) {String[] fields value.split(,);long releaseDateMillis LocalDate.parse(fields[1]).toEpochDay() * 24 * 60 * 60 * 1000;int runtime Integer.parseInt(fields[2]);double boxOffice Double.parseDouble(fields[3]);double prediction boxOffice / (runtime * 60 * 1000.0) * (releaseDateMillis - System.currentTimeMillis());return prediction 0 ? prediction : 0;}}).groupBy(new KeyValueMapperString, Double, Long() {Overridepublic Long apply(String key, Double value) {return 1L;}}).reduce(new ReducerDouble() {Overridepublic Double apply(Double value1, Double value2) {return value1 value2;}}).mapValues(new ValueMapperDouble, Double() {Overridepublic Double apply(Double value) {return value / (24 * 60 * 60 * 1000.0);}});// 将预测结果发送到 Kafka Topic 中boxOfficePrediction.toStream().to(prediction);// 启动 Kafka StreamsKafkaStreams streams new KafkaStreams(builder.build(), props);streams.start();}
}
四、总结
今天我们学了一些关于Kafka Stream的内容太知道了它是一种流处理引擎可以消费Kafka中的数据进行处理后还能其转换为输出流。它特点在于不需要额外征用集群资源、易于使用、支持丰富的转换操作。使用场景包括实时数据分析、实时预测等。但其实Kafka Stream的内容还是很多的我们将在后面的学习中继续讲解