医疗器械网站怎么做,北京短视频代运营,自己做网站的优势,有没有免费的crm系统软件1.集群规划
一般模式下#xff0c;元数据在 zookeeper 中#xff0c;运行时动态选举 controller#xff0c;由controller 进行 Kafka 集群管理。kraft 模式架构#xff08;实验性#xff09;下#xff0c;不再依赖 zookeeper 集群#xff0c;而是用三台 controller 节点…
1.集群规划
一般模式下元数据在 zookeeper 中运行时动态选举 controller由controller 进行 Kafka 集群管理。kraft 模式架构实验性下不再依赖 zookeeper 集群而是用三台 controller 节点代替 zookeeper元数据保存在 controller 中由 controller 直接进行 Kafka 集群管理。
好处有以下几个
Kafka 不再依赖外部框架而是能够独立运行controller 管理集群时不再需要从 zookeeper 中先读取数据集群性能上升由于不依赖 zookeeper集群扩展时不再受到 zookeeper 读写能力限制controller 不再动态选举而是由配置文件规定。可以有针对性的加强controller 节点的配置而不是像以前一样对随机 controller 节点的高负载束手无策。
kfka1 192.172.21.120)kfka2 192.172.21.121)kfka3 192.172.21.122)kafkakafkakafka
2.集群部署
1.下载kafka二进制包
https://kafka.apache.org/downloads
2.解压 tar -zxvf /data/kafka_2.13-3.7.0.tgz 3.修改配置文件(kfka1 192.172.21.120上节点的配置为例) cd /usr/kafka/kafka_2.13-3.7.0/config/kraft vi server.properties 注Kraft模式的配置文件在config目录的kraft子目录下 # Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the License); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an AS IS BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.#
# This configuration file is intended for use in KRaft mode, where
# Apache ZooKeeper is not present.
############################## Server Basics ############################## The role of this server. Setting this puts us in KRaft mode
#角色
process.rolesbroker,controller# The node id associated with this instances roles
#id
node.id1# The connect string for the controller quorum
controller.quorum.voters1192.172.21.120:19093,2192.172.21.121:19093,3192.172.21.122:19093############################# Socket Server Settings ############################## The address the socket server listens on.
# Combined nodes (i.e. those with process.rolesbroker,controller) must list the controller listener here at a minimum.
# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),
# with PLAINTEXT listener name, and port 19092.
# FORMAT:
# listeners listener_name://host_name:port
# EXAMPLE:
# listeners PLAINTEXT://your.host.name:19092
listenersSASL_PLAINTEXT://192.172.21.120:19092,CONTROLLER://192.172.21.120:19093
# Name of listener used for communication between brokers.
inter.broker.listener.nameSASL_PLAINTEXT# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for listeners.
advertised.listenersSASL_PLAINTEXT://192.172.21.120:19092# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in listener.security.protocol.map, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.namesCONTROLLER# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#CONTROLLER:SASL_PLAINTEXT需要修改
listener.security.protocol.mapCONTROLLER:SASL_PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# 设置必须授权才能用
allow.everyone.if.no.acl.foundfalse
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads3# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes104857600############################# Log Basics ############################## A comma separated list of directories under which to store log files
log.dirs/data/kafka/datas# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir1############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics __consumer_offsets and __transaction_state
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor1
transaction.state.log.replication.factor1
transaction.state.log.min.isr1############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms300000
# 认证方式用了最简单的PLAIN缺点是不能动态添加用户
sasl.mechanism.inter.broker.protocolPLAIN
sasl.enabled.mechanismsPLAIN
sasl.mechanismPLAIN
# 禁用了自动创建topic
auto.create.topics.enable false
# 设置必须授权才能用
allow.everyone.if.no.acl.foundfalse
# 设置超级管理员
super.usersUser:admin
# 这个是3.2.0版本新引入的认证方式可以参考 https://cwiki.apache.org/confluence/display/KAFKA/KIP-801%3AImplementanAuthorizerthatstoresmetadatain__cluster_metadata
authorizer.class.nameorg.apache.kafka.metadata.authorizer.StandardAuthorizer
# 集群间认证时用的认证方式
sasl.mechanism.controller.protocolPLAIN5.在其他节点上修改配置文件 在 192.168.58.131 和 192.168.58.132 上修改配置文件server.properties中 1.node.id 注node.id 不得重复整个集群中唯一且值需要和controller.quorum.voters 对应。 2.dvertised.Listeners地址 根据各自的主机名称修改相应的 dvertised.Listeners 地址 3.listeners地址 根据各自的主机IP修改 # 节点 ID node.id3 #不同服务器绑定的端口 listenersSASL_PLAINTEXT://192.172.21.121:19092,CONTROLLER://192.172.21.121:19093 # 侦听器名称、主机名和代理将向客户端公布的端口.(broker 对外暴露的地址) # 如果未设置则使用listeners的值. advertised.listenersSASL_PLAINTEXT://192.172.21.121:19092 # 节点 ID node.id4 #不同服务器绑定的端口 listenersSASL_PLAINTEXT://192.172.21.122:19092,CONTROLLER://192.172.21.122:19093 # 侦听器名称、主机名和代理将向客户端公布的端口.(broker 对外暴露的地址) # 如果未设置则使用listeners的值. advertised.listenersSASL_PLAINTEXT://192.172.21.122:19092 6.创建Kraft账号密码认证文件
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusernameadminpasswordpassworduser_adminpassworduser_testtest;
}; username/password 表示了认证时用的用户。suer_adminpassword,这个表示一个用户名为admin用户密码是password这个必须要有一个且要这一个跟上面的username和password保持一致。user_testtest 是第二个用户表示的是用户名为test的账户密码为test。 7.初始化集群数据目录
1.首先生成存储目录唯一 ID。 bin/kafka-storage.sh random-uuid 输出IDMu_PwVjLQGGYBcE_EjCfmA 2.用该 ID 格式化 kafka 存储目录每个节点都需要执行
bin/kafka-storage.sh format -t 7TraW-eCQXCx-HYoNY5VKw -c /data/kafka/kafka_2.13-3.7.0/config/kraft/server.properties8.启动集群
1.配置kafka服务的启动脚本 cp kafka-server-start.sh kafka-server-start-sasl.sh #!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the License); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an AS IS BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.if [ $# -lt 1 ];
thenecho USAGE: $0 [-daemon] server.properties [--override propertyvalue]*exit 1
fi
base_dir$(dirname $0)if [ x$KAFKA_LOG4J_OPTS x ]; thenexport KAFKA_LOG4J_OPTS-Dlog4j.configurationfile:$base_dir/../config/log4j.properties
fiif [ x$KAFKA_HEAP_OPTS x ]; then
#将创建的kafka_server_jaas.conf地址添加到下面export KAFKA_HEAP_OPTS-Xmx1G -Xms1G -Djava.security.auth.login.config/data/kafka/config/kafka_server_jaas.conf
fiEXTRA_ARGS${EXTRA_ARGS--name kafkaServer -loggc}COMMAND$1
case $COMMAND in-daemon)EXTRA_ARGS-daemon $EXTRA_ARGSshift;;*);;
esacexec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka $
kafka_2.13-3.6.0-1、kafka_2.13-3.6.0-2、kafka_2.13-3.6.0-3修改部分为
if [ x$KAFKA_HEAP_OPTS x ]; thenexport KAFKA_HEAP_OPTS-Xmx1G -Xms1G -Djava.security.auth.login.config/data/kafka-cluster/global_config/kafka_server_jaas.conf
fi2.在节点上依次启动 Kafka
kafka-server-start-sasl.sh -daemon /data/kafka/kafka_2.13-3.7.0/config/kraft/server.properties
9.命令测试集群 1.先创建一个用于client的认证文件 vim jaas.properties
2. 配置上一个用户
sasl.jaas.configorg.apache.kafka.common.security.plain.PlainLoginModule required usernameadmin passwordpassword;
security.protocolSASL_PLAINTEXT
sasl.mechanismPLAI
#执行命令式后面都要带上 --command-config ./jaas.properties来进行用户认证 3.创建 topic create-for-test 到bin下面
bin/kafka-topics.sh --bootstrap-server 192.172.21.120:19092 --create --topic repair.queue --partitions 1 --replication-factor 1 --command-config /data/kafka/config/jaas.properties4. 查看topic应该只能看到 create-for-test
./kafka-console-producer.sh broker-list --bootstrap-server 192.172.21.120:19092 --topic create-for-test --producer.config /data/kafka/config/jaas.properties4.测试进行消费先创建kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
usernameadmin
passwordpassword;
};
5.修改kafka-console-producer.sh和kafka-console-consumer.sh启动文件两个都要改
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the License); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an AS IS BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.if [ x$KAFKA_HEAP_OPTS x ]; thenexport KAFKA_HEAP_OPTS-Xmx512M
fi
#添加-Djava.security.auth.login.config/data/kafka/config/kafka_client_jaas.conf
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config/data/kafka/config/kafka_client_jaas.conf kafka.tools.ConsoleProducer $
6.打开生产监控等待消费查看
./kafka-console-producer.sh broker-list --bootstrap-server 192.172.21.120:19092 --topic s_system_trace_topic --producer.config /data/kafka/config/jaas.properties7.进消费数据在生产监控看到这样就完成测试了
./kafka-console-consumer.sh --bootstrap-server 192.172.21.120:19092 --topic create-for-test --from-beginning --consumer.config /data/kafka/config/jaas.properties8.删除测试主题
bin/kafka-topics.sh --bootstrap-server 192.172.21.120:19092 --delete --topic create-for-test --command-config /data/kafka/config/jaas.properties
如果不需要加SASL认证参考https://www.cnblogs.com/fanqisoft/p/18027195
那不懂的可以联系博主哦