珠宝类企业网站(手机端),注册新公司的流程,海安县建设局网站,wordpress分类自定义字段调用Kafka 现在在企业和互联网项目中的应用越来越多了#xff0c;本篇文章就从 Kafka 的基础开始带你一展 Kafka 的宏图。
作者#xff1a;cxuan Kafka 现在在企业和互联网项目中的应用越来越多了#xff0c;本篇文章就从 Kafka 的基础开始带你一展 Kafka 的宏图。 图片来自 Pe…Kafka 现在在企业和互联网项目中的应用越来越多了本篇文章就从 Kafka 的基础开始带你一展 Kafka 的宏图。
作者cxuan Kafka 现在在企业和互联网项目中的应用越来越多了本篇文章就从 Kafka 的基础开始带你一展 Kafka 的宏图。 图片来自 Pexels
什么是 Kafka
Kafka 是一个分布式流式平台它有三个关键能力
订阅发布记录流它类似于企业中的消息队列或企业消息传递系统。以容错的方式存储记录流。实时记录流。
Kafka 的应用
作为消息系统。作为存储系统。作为流处理器。
Kafka 可以建立流数据管道可靠地在系统或应用之间获取数据。建立流式应用传输和响应数据。
Kafka 作为消息系统 Kafka 作为消息系统它有三个基本组件
Producer : 发布消息的客户端Broker一个从生产者接受并存储消息的客户端Consumer : 消费者从 Broker 中读取消息
在大型系统中会需要和很多子系统做交互也需要消息传递在诸如此类系统中你会找到源系统(消息发送方)和目的系统(消息接收方)。
为了在这样的消息系统中传输数据你需要有合适的数据管道 这种数据的交互看起来就很混乱如果我们使用消息传递系统那么系统就会变得更加简单和整洁。 Kafka 运行在一个或多个数据中心的服务器上作为集群运行
Kafka 集群存储消息记录的目录被称为 Topics。每一条消息记录包含三个要素键(Key)、值(Value)、时间戳(Timestamp)。
核心 API
Kafka 有四个核心 API它们分别是 Producer API它允许应用程序向一个或多个 Topics 上发送消息记录。Consumer API允许应用程序订阅一个或多个 Topics 并处理为其生成的记录流。Streams API它允许应用程序作为流处理器从一个或多个主题中消费输入流并为其生成输出流有效的将输入流转换为输出流。Connector API它允许构建和运行将 Kafka 主题连接到现有应用程序或数据系统的可用生产者和消费者。例如关系数据库的连接器可能会捕获对表的所有更改。
Kafka 基本概念
Kafka 作为一个高度可扩展可容错的消息系统它有很多基本概念下面就来认识一下这些 Kafka 专属的概念。
Topic
Topic 被称为主题在 Kafka 中使用一个类别属性来划分消息的所属类划分消息的这个类称为 Topic。
Topic 相当于消息的分配标签是一个逻辑概念。主题好比是数据库的表或者文件系统中的文件夹。
Partition
Partition 译为分区Topic 中的消息被分割为一个或多个的 Partition它是一个物理概念对应到系统上就是一个或若干个目录一个分区就是一个提交日志。消息以追加的形式写入分区先后以顺序的方式读取。 注意由于一个主题包含无数个分区因此无法保证在整个 Topic 中有序但是单个 Partition 分区可以保证有序。消息被迫加写入每个分区的尾部。Kafka 通过分区来实现数据冗余和伸缩性。
分区可以分布在不同的服务器上也就是说一个主题可以跨越多个服务器以此来提供比单个服务器更强大的性能。
Segment
Segment 被译为段将 Partition 进一步细分为若干个 Segment每个 Segment 文件的大小相等。
Broker
Kafka 集群包含一个或多个服务器每个 Kafka 中服务器被称为 Broker。Broker 接收来自生产者的消息为消息设置偏移量并提交消息到磁盘保存。
Broker 为消费者提供服务对读取分区的请求作出响应返回已经提交到磁盘上的消息。
Broker 是集群的组成部分每个集群中都会有一个 Broker 同时充当了集群控制器(Leader)的角色它是由集群中的活跃成员选举出来的。
每个集群中的成员都有可能充当 LeaderLeader 负责管理工作包括将分区分配给 Broker 和监控 Broker。
集群中一个分区从属于一个 Leader但是一个分区可以分配给多个 Broker(非 Leader)这时候会发生分区复制。
这种复制的机制为分区提供了消息冗余如果一个 Broker 失效那么其他活跃用户会重新选举一个 Leader 接管。 Producer
生产者即消息的发布者其会将某 Topic 的消息发布到相应的 Partition 中。
生产者在默认情况下把消息均衡地分布到主题的所有分区上而并不关心特定消息会被写到哪个分区。不过在某些情况下生产者会把消息直接写到指定的分区。
Consumer
消费者即消息的使用者一个消费者可以消费多个 Topic 的消息对于某一个 Topic 的消息其只会消费同一个 Partition 中的消息。 在了解完 Kafka 的基本概念之后我们通过搭建 Kafka 集群来进一步深刻认识一下 Kafka。
确保安装环境
安装 Java 环境
在安装 Kafka 之前先确保 Linux 环境上是否有 Java 环境使用 java -version 命令查看 Java 版本推荐使用 Jdk 1.8 。
如果没有安装 Java 环境的话可以按照这篇文章进行安装
https://www.cnblogs.com/zs-notes/p/8535275.html
安装 Zookeeper 环境
Kafka 的底层使用 Zookeeper 储存元数据确保一致性所以安装 Kafka 前需要先安装 ZookeeperKafka 的发行版自带了 Zookeeper 可以直接使用脚本来启动不过安装一个 Zookeeper 也不费劲。
Zookeeper 单机搭建
Zookeeper 单机搭建比较简单直接从官网下载一个稳定版本的 Zookeeper
https://www.apache.org/dyn/closer.cgi/zookeeper/
这里我使用的是 3.4.10下载完成后在 Linux 系统中的 /usr/local 目录下创建 Zookeeper 文件夹。
然后使用 xftp 工具(xftp 和 xshell 工具都可以在官网 https://www.netsarang.com/zh/xshell/ 申请免费的家庭版)把下载好的 Zookeeper 压缩包放到 /usr/local/zookeeper 目录下。
如果下载的是一个 tar.gz 包的话直接使用 tar -zxvf zookeeper-3.4.10.tar.gz 解压即可。
如果下载的是 zip 包的话还要检查一下 Linux 中是否有 unzip 工具如果没有的话使用 yum install unzip 安装 zip 解压工具完成后使用 unzip zookeeper-3.4.10.zip 解压即可。
解压完成后cd 到 /usr/local/zookeeper/zookeeper-3.4.10 创建一个 data 文件夹然后进入到 conf 文件夹下使用 mv zoo_sample.cfg zoo.cfg 进行重命名操作。
然后使用 vi 打开 zoo.cfg 更改一下dataDir/usr/local/zookeeper/zookeeper-3.4.10/data 保存。
进入 bin 目录启动服务输入命令 ./zkServer.sh start 输出下面内容表示搭建成功 关闭服务输入命令./zkServer.sh stop 使用 ./zkServer.sh status 可以查看状态信息。
Zookeeper 集群搭建
①准备条件
需要三个服务器这里我使用了 CentOS7 并安装了三个虚拟机并为各自的虚拟机分配了 1GB 的内存在每个 /usr/local/ 下面新建 Zookeeper 文件夹。
把 Zookeeper 的压缩包挪过来解压完成后会有 zookeeper-3.4.10 文件夹进入到文件夹新建两个文件夹分别是 data 和 log 文件夹。
注上一节单机搭建中已经创建了一个 data 文件夹就不需要重新创建了直接新建一个 log 文件夹对另外两个新增的服务需要新建这两个文件夹。
②设置集群
新建完成后需要编辑 conf/zoo.cfg 文件三个文件的内容如下
tickTime2000
initLimit10
syncLimit5
dataDir/usr/local/zookeeper/zookeeper-3.10/data
dataLogDir/usr/local/zookeeper/zookeeper-3.4.10/log
clientPort12181
server.1192.168.1.7:12888:13888
server.2192.161.8:12888:13888
server.3192.168.1.9:12888:13888
server.1 中的这个 1 表示的是服务器的标识也可以是其他数字表示这是第几号服务器这个标识要和下面我们配置的 myid 的标识一致。
192.168.1.7:12888:13888 为集群中的 ip 地址第一个端口表示的是 master 与 slave 之间的通信接口默认是 2888。
第二个端口是 Leader 选举的端口集群刚启动的时候选举或者 Leader 挂掉之后进行新的选举的端口默认是 3888。
现在对上面的配置文件进行解释
tickTime这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔也就是每个 tickTime 时间就会发送一个心跳。initLimit这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。
当已经超过 5 个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息那么表明这个客户端连接失败。总的时间长度就是 5*200010 秒。
syncLimit这个配置项标识 Leader 与 Follower 之间发送消息请求和应答时间长度最长不能超过多少个 tickTime 的时间长度总的时间长度就是 5*200010 秒。dataDir快照日志的存储路径。dataLogDir事务日志的存储路径如果不配置这个那么事务日志会默认存储到 dataDir 指定的目录这样会严重影响 ZK 的性能当 ZK 吞吐量较大的时候产生的事务日志、快照日志太多。clientPort这个端口就是客户端连接 Zookeeper 服务器的端口Zookeeper 会监听这个端口接受客户端的访问请求。
③创建 myid 文件
在了解完其配置文件后现在来创建每个集群节点的 myid 我们上面说过这个 myid 就是 server.1 的这个 1 类似的需要为集群中的每个服务都指定标识使用 echo 命令进行创建
# server.1
echo 1 /usr/local/zookeeper/zookeeper-3.4.10/data/myid
# server.2
echo 2 /usr/local/zookeeper/zookeeper-3.10/data/myid
# server.3
echo 3 /usr/local/zookeeper/zookeeper-3.4.10/data/myid
④启动服务并测试
配置完成为每个 ZK 服务启动并测试我在 Windows 电脑的测试结果如下。
启动服务(每台都需要执行)
cd /usr/local/zookeeper/zookeeper-3.4.10/bin
./zkServer.sh start
使用 ./zkServer.sh status 命令检查服务状态
192.168.1.7 --- follower 192.168.1.8 --- leader 192.168.1.9 --- follower ZK 集群一般只有一个 Leader多个 Follower主一般是相应客户端的读写请求而从主同步数据当主挂掉之后就会从 Follower 里投票选举一个 Leader 出来。
Kafka 集群搭建
准备条件
准备条件如下
搭建好的 Zookeeper 集群Kafka 压缩包
https://www.apache.org/dyn/closer.cgi?path/kafka/2.3.0/kafka_2.12-2.3.0.tgz
在 /usr/local 下新建 Kafka 文件夹然后把下载完成的 tar.gz 包移到 /usr/local/kafka 目录下使用 tar -zxvf 压缩包进行解压。
解压完成后进入到 kafka_2.12-2.3.0 目录下新建 log 文件夹进入到 config 目录下。
我们可以看到有很多 properties 配置文件这里主要关注 server.properties 这个文件即可。 Kafka 启动方式有两种
一种是使用 Kafka 自带的 Zookeeper 配置文件来启动(可以按照官网来进行启动并使用单个服务多个节点来模拟集群http://kafka.apache.org/quickstart#quickstart_multibroker)。一种是通过使用独立的 ZK 集群来启动这里推荐使用第二种方式使用 ZK 集群来启动。
②修改配置项
需要为每个服务都修改一下配置项也就是 server.properties 需要更新和添加的内容有
broker.id0 //初始是0每个 server 的broker.id 都应该设置为不一样的就和 myid 一样 我的三个服务分别设置的是 1,2,3
log.dirs/usr/local/kafka/kafka_12-2.3.0/log #在log.retention.hours168 下面新增下面三项
message.max.byte5242880
default.replication.factor2
replica.fetch.max.bytes5242880 #设置zookeeper的连接端口
zookeeper.connect192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181
配置项的含义
broker.id0 #当前机器在集群中的唯一标识和zookeeper的myid性质一样 port9092 #当前kafka对外提供服务的端口默认是9092 host.name192.168.1.7 #这个参数默认是关闭的在0.8.1有个bugDNS解析问题失败率的问题。 num.network.threads3 #这个是borker进行网络处理的线程数 num.io.threads8 #这个是borker进行I/O处理的线程数 log.dirs/usr/local/kafka/kafka_2.12-2.3.0/log #消息存放的目录这个目录可以配置为“”逗号分割的表达式上面的num.io.threads要大于这个目录的个数这个目录如果配置多个目录新创建的topic他把消息持久化的地方是当前以逗号分割的目录中那个分区数最少就放那一个 socket.send.buffer.bytes102400 #发送缓冲区buffer大小数据不是一下子就发送的先回存储到缓冲区了到达一定的大小后在发送能提高性能 socket.receive.buffer.bytes102400 #kafka接收缓冲区大小当数据到达一定大小后在序列化到磁盘 socket.request.max.bytes104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数这个值不能超过java的堆栈大小 num.partitions1 #默认的分区数一个topic默认1个分区数 log.retention.hours168 #默认消息的最大持久化时间168小时7天 message.max.byte5242880 #消息保存的最大值5M default.replication.factor2 #kafka保存消息的副本数如果一个副本失效了另一个还可以继续提供服务 replica.fetch.max.bytes5242880 #取消息的最大直接数 log.segment.bytes1073741824 #这个参数是因为kafka的消息是以追加的形式落地到文件当超过这个值的时候kafka会新起一个文件 log.retention.check.interval.ms300000 #每隔300000毫秒去检查上面配置的log失效时间log.retention.hours168 到目录查看是否有过期的消息如果有删除 log.cleaner.enablefalse #是否启用log压缩一般不用启用启用的话可以提高性能 zookeeper.connect192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181 #设置zookeeper的连接端口
③启动 Kafka 集群并测试
启动服务进入到 /usr/local/kafka/kafka_2.12-2.3.0/bin 目录下
# 启动后台进程
./kafka-server-start.sh -daemon ../config/server.properties
检查服务是否启动
# 执行命令 jps
6201 QuorumPeerMain
7035 Jps
6972 Kafka
Kafka 已经启动创建 Topic 来验证是否创建成功
# cd .. 往回退一层 到 /usr/local/kafka/kafka_2.12-2.3.0 目录下
bin/kafka-topics.sh --create --zookeeper 19168.1.7:2181 --replication-factor 2 --partitions 1 --topic cxuan
对上面的解释
Replication-factor 2复制两份Partitions 1创建1个分区Topic创建主题
查看我们的主题是否创建成功
bin/kafka-topics.sh --list --zookeeper 192.168.7:2181 启动一个服务就能把集群启动起来。
在一台机器上创建一个发布者
# 创建一个broker发布者
./kafka-console-producer.sh --broker-list 19168.1.7:9092 --topic cxuantopic
在一台服务器上创建一个订阅者
# 创建一个consumer 消费者
bin/kafka-console-consumer.sh --bootstrap-server 19168.1.7:9092 --topic cxuantopic --from-beginning
注意这里使用 --zookeeper 的话可能出现 zookeeper is not a recognized option 的错误这是因为 Kafka 版本太高需要使用 --bootstrap-server 指令。
测试结果如下
发布 消费 ④其他命令
显示 topic
bin/kafka-topics.sh --list --zookeeper 192.168.7:2181 # 显示
cxuantopic
查看 topic 状态
bin/kafka-topics.sh --describe --zookeeper 192.168.7:2181 --topic cxuantopic # 下面是显示的详细信息
Topic:cxuantopic PartitionCount:1 ReplicationFactor:2 Configs:
Topic: cxuantopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 # 分区为为1 复制因子为2 主题 cxuantopic 的分区为0
# Replicas: 0,1 复制的为12
Leader 负责给定分区的所有读取和写入的节点每个节点都会通过随机选择成为 Leader。
Replicas 是为该分区复制日志的节点列表无论它们是 Leader 还是当前处于活动状态。
Isr 是同步副本的集合。它是副本列表的子集当前仍处于活动状态并追随Leader。至此Kafka 集群搭建完毕。
⑤验证多节点接收数据
刚刚我们都使用的是相同的 ip 服务下面使用其他集群中的节点验证是否能够接受到服务。
在另外两个节点上使用
bin/kafka-console-consumer.sh --bootstrap-server 192.168.7:9092 --topic cxuantopic --from-beginning
然后再使用 Broker 进行消息发送经测试三个节点都可以接受到消息。
配置详解
在搭建 Kafka 的时候我们简单介绍了一下 server.properties 中配置的含义现在我们来详细介绍一下参数的配置和概念。
常规配置
这些参数是 Kafka 中最基本的配置
broker.id每个 Broker 都需要有一个标识符使用 broker.id 来表示。它的默认值是 0它可以被设置成其他任意整数在集群中需要保证每个节点的 broker.id 都是唯一的。
port如果使用配置样本来启动 Kafka 它会监听 9092 端口修改 port 配置参数可以把它设置成其他任意可用的端口。
zookeeper.connect用于保存 Broker 元数据的地址是通过 zookeeper.connect 来指定。
localhost2181表示运行在本地 2181 端口。该配置参数是用逗号分隔的一组 hostname:port/path 列表。
每一部分含义如下
hostname 是 Zookeeper 服务器的服务名或 IP 地址。port 是 Zookeeper 连接的端口。/path 是可选的 Zookeeper 路径作为 Kafka 集群的 chroot 环境。如果不指定默认使用跟路径。
log.dirsKafka 把消息都保存在磁盘上存放这些日志片段的目录都是通过 log.dirs 来指定的。它是一组用逗号分隔的本地文件系统路径。
如果指定了多个路径那么 Broker 会根据 最少使用 原则把同一分区的日志片段保存到同一路径下。
要注意Broker 会向拥有最少数目分区的路径新增分区而不是向拥有最小磁盘空间的路径新增分区。
num.recovery.threads.per.data.dir对于如下三种情况Kafka 会使用可配置的线程池来处理日志片段
服务器正常启动用于打开每个分区的日志片段。服务器崩溃后启动用于检查和截断每个分区的日志片段。服务器正常关闭用于关闭日志片段。
默认情况下每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到所以完全可以设置大量的线程来达到并行操作的目的。
特别是对于包含大量分区的服务器来说一旦发生崩溃在进行恢复时使用井行操作可能会省下数小时的时间。
设置此参数时需要注意所配置的数字对应的是 log.dirs 指定的单个日志目录。
也就是说如果 num.recovery.threads.per.data.dir 被设为 8并且 log.dir 指定了 3 个路径那么总共需要 24 个线程。
auto.create.topics.enable默认情况下Kafka 会在如下 3 种情况下创建主题
当一个生产者开始往主题写入消息时当一个消费者开始从主题读取消息时当任意一个客户向主题发送元数据请求时
delete.topic.enable如果你想要删除一个主题你可以使用主题管理工具。
默认情况下是不允许删除主题的delete.topic.enable 的默认值是 false 因此你不能随意删除主题。
这是对生产环境的合理性保护但是在开发环境和测试环境是可以允许你删除主题的所以如果你想要删除主题需要把 delete.topic.enable 设为 true。
主题默认配置
Kafka 为新创建的主题提供了很多默认配置参数下面就来一起认识一下这些参数。
num.partitionsnum.partitions 参数指定了新创建的主题需要包含多少个分区。
如果启用了主题自动创建功能(该功能是默认启用的)主题分区的个数就是该参数指定的值。该参数的默认值是 1。要注意我们可以增加主题分区的个数但不能减少分区的个数。
default.replication.factor这个参数比较简单它表示 Kafka 保存消息的副本数。
如果一个副本失效了另一个还可以继续提供服务default.replication.factor 的默认值为 1这个参数在你启用了主题自动创建功能后有效。
log.retention.msKafka 通常根据时间来决定数据可以保留多久。默认使用 log.retention.hours 参数来配置时间默认是 168 个小时也就是一周。
除此之外还有两个参数 log.retention.minutes 和 log.retentiion.ms 。这三个参数作用是一样的都是决定消息多久以后被删除推荐使用 log.retention.ms。
log.retention.bytes另一种保留消息的方式是判断消息是否过期。它的值通过参数 log.retention.bytes 来指定作用在每一个分区上。
也就是说如果有一个包含 8 个分区的主题并且 log.retention.bytes 被设置为 1GB那么这个主题最多可以保留 8GB 数据。
所以当主题的分区个数增加时整个主题可以保留的数据也随之增加。
log.segment.bytes上述的日志都是作用在日志片段上而不是作用在单个消息上。
当消息到达 Broker 时它们被追加到分区的当前日志片段上当日志片段大小到达 log.segment.bytes 指定上限(默认为 1GB)时当前日志片段就会被关闭一个新的日志片段被打开。
如果一个日志片段被关闭就开始等待过期。这个参数的值越小就越会频繁的关闭和分配新文件从而降低磁盘写入的整体效率。
log.segment.ms上面提到日志片段经关闭后需等待过期那么 log.segment.ms 这个参数就是指定日志多长时间被关闭的参数。
log.segment.ms 和 log.retention.bytes 也不存在互斥问题。日志片段会在大小或时间到达上限时被关闭就看哪个条件先得到满足。
message.max.bytesBroker 通过设置 message.max.bytes 参数来限制单个消息的大小默认是 1000 000 也就是 1MB。
如果生产者尝试发送的消息超过这个大小不仅消息不会被接收还会收到 Broker 返回的错误消息。
跟其他与字节相关的配置参数一样该参数指的是压缩后的消息大小也就是说只要压缩后的消息小于 mesage.max.bytes那么消息的实际大小可以大于这个值。
这个值对性能有显著的影响。值越大那么负责处理网络连接和请求的线程就需要花越多的时间来处理这些请求。它还会增加磁盘写入块的大小从而影响 IO 吞吐量。
文章参考
Kafka【第一篇】Kafka 集群搭建https://juejin.im/post/5ba792f5e51d450e9e44184dhttps://blog.csdn.net/k393393/article/details/93099276《Kafka 权威指南》https://www.learningjournal.guru/courses/kafka/kafka-foundation-training/broker-configurations/
阅读目录置顶)(长期更新计算机领域知识
阅读目录置顶)(长期更新计算机领域知识
阅读目录置顶)(长期科技领域知识
歌谣带你看java面试题