网站建设哪家最好,网站举报平台建设情况,html网页制作公司,网站统一建设统一管理1 confluent介绍
Confluent是用来管理和组织不同数据源的流媒体平台#xff0c;可以实时地把不同源和位置的数据集成到一个中心的事件流平台。并且很可靠、性能很高。
Confluent目前提供了社区版#xff08;免费#xff09;和商业版#xff08;收费#xff09;两个版本可以实时地把不同源和位置的数据集成到一个中心的事件流平台。并且很可靠、性能很高。
Confluent目前提供了社区版免费和商业版收费两个版本社区版提供了Connectors、REST Proxy、KSQL、Schema-Registry等基础服务。商业版为企业提供了控制面板、负载均衡跨中心数据备份、安全防护等高级特性。
1.2 服务功能介绍
1.2.1 Zookeeper
Zookeeper是一个开放源码的分布式应用程序协调服务主要功能包扩维护配置信息、命名、提供分布式同步、组管理等集中式服务 。Kafka使用ZooKeeper对集群元数据进行持久化存储如果ZooKeeper丢失了Kafka数据集群的副本映射关系以及topic等配置信息都会丢失最终导致Kafka集群不再正常工作造成数据丢失的后果。
1.2.2 Kafka
Kafka是一个分布式流处理平台基于zookeeper协调并支持分区和多副本的分布式消息系统是一种高吞吐量的分布式发布订阅消息系统消息队列中间件主要功能是负责消息传输Confluent就是依赖Kafka来进行消息传输。Kafka最大的特性就是可以实时的处理大量数据以满足各种需求场景。
1.2.3 Control Center
control center可以很容易地管理kafka的连接创建编辑和管理与其他系统的连接。我们可以从producer到consumer监控data streams保证我们的每一条消息都被传递还能测量出消息的传输耗时多久。使用confluent control center能让开发人员不写一句代码也能构建基于kafka的数据生产管道。
1.2.4 Kafka-rest
Kafka-rest是Kafka RESTful接口服务组件可以通过Restful接口而不是本机Kafka协议或客户端的情况下生成和使用消息,而且还可以查看集群状态以及执行管理操作。
1.2.5 Schema-Registry
Schema-Registry是为元数据管理提供的服务同样提供了RESTful接口用来存储和获取schemas它能够保存数据格式变化的所有版本并可以做到向下兼容。Schema-Registry还为Kafka提供了Avro格式的序列化插件来传输消息。Confluent主要用Schema-Registry来对数据schema进行管理和序列化操作。
1.2.6 Connect
Kafka Connect是 Kafka的一个开源组件是用来将Kafka与数据库、key-value存储系统、搜索系统、文件系统等外部系统连接起来的基础框架。通过使用Kafka Connect框架以及现有的连接器可以实现从源数据读入消息到Kafka再从Kafka读出消息到目的地的功能。
1.2.7 ksql-server
KSQL是使用SQL语句对Apache Kafka执行流处理任务的流式SQL引擎Confluent 使用KSQL对Kafka的数据提供查询服务.
2 confluent下载
使用的开源的confluent的5.2.4版本
下载链接http://packages.confluent.io/archive/5.2/confluent-5.2.4-2.11.tar.gz
3 环境准备
分布式搭建建议至少3个节点但是由于用于测试及节点紧张这里使用2个节点
节点zookeeperkafkacontrol-centerkafka-resetschema-registryconnectorksql-server10.0.165.8√√√√√√√10.0.165.9√√√√√√√2181909290218082808180838088
4 安装
4.1 解压
将下载的文件上传至linux然后解压至相应的目录下
tar -zxvf /opt/package/confluent-5.2.4-2.11.tar.gz -C /home/kafka/.local/修改文件名并进入到相应的目录下
mv /home/kafka/.local/confluent-5.2.4 /home/kafka/.local/confluent
cd /home/kafka/.local/confluent4.2 修改配置
修改10.0.165.8节点的相应配置
4.2.1 zookeeper配置
(1)vim /home/kafka/.local/confluent/etc/kafka/zookeeper.properties
##数据存放目录默认为/tmp/zookeepe存在删除风险
dataDir/data/confluent/zookeeper
clientPort2181
maxClientCnxns0
initLimit5
syncLimit2##多个zookeeper serverserver的编号1、2等要与myid中的一致
server.110.0.165.8:2888:3888
server.210.0.165.9:2888:3888(2)生成myid
echo 1 /home/kafka/.local/confluent/etc/kafka/myid
(3)修改confluent服务启动脚本将myid发布到confluent运行目录下。
bin/confluent start会启动confluent的各服务且会将etc下的各配置复制到confluent运行目录下。
vim /home/kafka/.local/confluent/bin/confluent
在config_zookeeper()方法块最后一行添加
cp ${confluent_conf}/kafka/myid $confluent_current/zookeeper/data/
目的是将etc/kafka/myid拷贝到confluent运行目录下否则会报myid is no foundzookeeper启动失败。
4.2.2 Kafka配置
vim /home/kafka/.local/confluent/etc/kafka/server.properties
broker.id0#listeners与advertised.listeners可以只配一个与当前机器网卡有关系请注意。advertised.listeners可能通用性更强值为当前机器的ip与端口其他机器ip无需配置
advertised.listenersPLAINTEXT://10.0.165.8:9092##根据实际情况调整
num.network.threads8
num.io.threads8
socket.send.buffer.bytes1048576
socket.receive.buffer.bytes1048576
socket.request.max.bytes104857600
fetch.purgatory.purge.interval.requests100
producer.purgatory.purge.interval.requests100#log.dirs是最重要的配置kafka数据所在
log.dirs/data/confluent/kafka-logs
num.partitions12num.recovery.threads.per.data.dir1message.max.bytes10000000
replica.fetch.max.bytes 10485760
auto.create.topics.enabletrue
auto.leader.rebalance.enable true##备份因子数kafka节点数若大于会报错
default.replication.factor2
offsets.topic.replication.factor2
transaction.state.log.replication.factor1
transaction.state.log.min.isr1log.flush.interval.messages20000
log.flush.interval.ms10000
log.flush.scheduler.interval.ms2000
log.retention.check.interval.ms300000
log.cleaner.enabletrue##log失效时间单位小时
log.retention.hours48
zookeeper.connect10.0.165.8:2181,10.0.165.9:2181
zookeeper.connection.timeout.ms6000
zookeeper.sync.time.ms2000confluent.metrics.reporter.bootstrap.servers10.0.165.8:9092,10.0.165.9:9092
confluent.metrics.reporter.topic.replicas2confluent.support.metrics.enabletrue
confluent.support.customer.idanonymousdelete.topic.enabletrue
group.initial.rebalance.delay.ms04.2.3 kafka-rest
vim /home/kafka/.local/confluent/etc/kafka-rest/kafka-rest.properties
idkafka-rest-server-001
schema.registry.urlhttp://10.0.165.8:8081
zookeeper.connect10.0.165.8:2181,10.0.165.9:2181
bootstrap.serversPLAINTEXT://10.0.165.8:9092
port8082
consumer.threads8access.control.allow.methodsGET,POST,PUT,DELETE,OPTIONS
access.control.allow.origin*4.2.4 ksql
confluent-4没有这个
vim /home/kafka/.local/confluent/etc/ksql/ksql-server.properties
ksql.service.iddefault_
bootstrap.servers10.0.165.8:9092,10.0.165.9:9092
listenershttp://0.0.0.0:8088
ksql.schema.registry.urlhttp://10.0.165.8:8081,http://10.0.165.9:8081
ksql.sink.partitions4
4.2.5 confluent-control-center
vim /home/kafka/.local/confluent/etc/confluent-control-center/control-center-dev.properties
bootstrap.servers10.0.165.8:9092,10.0.165.9:9092
zookeeper.connect10.0.165.8:2181,10.0.165.9:2181
confluent.controlcenter.rest.listenershttp://0.0.0.0:9021#每个id要唯一不然只能启动一个
confluent.controlcenter.id1
confluent.controlcenter.data.dir/data/confluent/control-center
confluent.controlcenter.connect.clusterhttp://10.0.165.8:8083,http://10.0.165.9:8083##每台都配置各自的ip
confluent.controlcenter.ksql.urlhttp://10.0.165.8:8088
confluent.controlcenter.schema.registry.urlhttp:/10.0.165.8:8081,http://10.0.165.9:8081confluent.controlcenter.internal.topics.replication2
confluent.controlcenter.internal.topics.partitions2
confluent.controlcenter.command.topic.replication2
confluent.monitoring.interceptor.topic.partitions2
confluent.monitoring.interceptor.topic.replication2
confluent.metrics.topic.replication2confluent.controlcenter.streams.num.stream.threads30
4.2.6 schema-registry
vim /home/kafka/.local/confluent/etc/schema-registry/schema-registry.properties
listenershttp://0.0.0.0:8081
kafkastore.bootstrap.serversPLAINTEXT://10.0.165.8:9092,10.0.165.9:9092
kafkastore.topic_schemas
debugfalse
4.2.7 connect
vim /home/kafka/.local/confluent/etc/schema-registry/connect-avro-distributed.properties
bootstrap.servers10.0.165.8:9092,10.0.165.9:9092
group.idconnect-clusterkey.converterorg.apache.kafka.connect.storage.StringConverter
key.converter.schema.registry.urlhttp://localhost:8081
value.converterio.confluent.connect.avro.AvroConverter
value.converter.schema.registry.urlhttp://localhost:8081config.storage.topicconnect-configs
offset.storage.topicconnect-offsets
status.storage.topicconnect-statusesconfig.storage.replication.factor2
offset.storage.replication.factor2
status.storage.replication.factor2internal.key.converterorg.apache.kafka.connect.json.JsonConverter
internal.value.converterorg.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enablefalse
internal.value.converter.schemas.enablefalserest.port8083
rest.advertised.port8083plugin.path/home/kafka/.local/confluent/share/java4.2.8 将confluent发送到其他节点
scp -r confluent/ kafka10.0.165.9:/home/kafka/.local/
然后修改其他节点的配置
vi myid
2vi /home/kafka/.local/confluent/etc/kafka/server.properties
broker.id1
advertised.listenersPLAINTEXT://10.0.165.9:9092vi /home/kafka/.local/confluent/etc/kafka-rest/kafka-rest.properties
idkafka-rest-server-002
schema.registry.urlhttp://10.0.165.9:8081
bootstrap.serversPLAINTEXT://10.0.165.9:9092vi /home/kafka/.local/confluent/etc/confluent-control-center/control-center-dev.properties
confluent.controlcenter.id2
confluent.controlcenter.ksql.urlhttp://10.0.165.9:8088然后在两个节点的/data目录下新建confluent并修改权限
sudo mkdir /data/confluent
sudo chown kafka:kafka /data/confluent4.3 服务启动与停止
4.3.1 全部服务启动
启动bin/confluent start
查看状态bin/confluent status
停止bin/confluent stop
4.3.2 单独启动服务
服务单独启动
启动kafka-rest
bin/kafka-rest-start etc/kafka-rest/kafka-rest.properties上面的这种方式是前台启动也可以以后台方式启动。
nohup bin/kafka-rest-start etc/kafka-rest/kafka-rest.properties 启动zookeeper
bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties 启动kafka broker
bin/kafka-server-start -daemon etc/kafka/server.properties启动schema registry
bin/schema-registry-start -daemon etc/schema-registry/schema-registry.properties5 安装过程常见报错
5.1 KafkaServer启动失败
[2020-06-27 04:28:15,713] FATAL [KafkaServer id2] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.KafkaException: Socket server failed to bind to 10.0.165.8:9092: Cannot assign requested address.at kafka.network.Acceptor.openServerSocket(SocketServer.scala:331)at kafka.network.Acceptor.init(SocketServer.scala:256)at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:97)at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:89)at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)at kafka.network.SocketServer.startup(SocketServer.scala:89)at kafka.server.KafkaServer.startup(KafkaServer.scala:229)at io.confluent.support.metrics.SupportedServerStartable.startup(SupportedServerStartable.java:112)at io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:58)
Caused by: java.net.BindException: Cannot assign requested addressat sun.nio.ch.Net.bind0(Native Method)at sun.nio.ch.Net.bind(Net.java:433)at sun.nio.ch.Net.bind(Net.java:425)at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)at kafka.network.Acceptor.openServerSocket(SocketServer.scala:327)... 9 more
[2020-06-27 04:28:15,715] INFO [KafkaServer id2] shutting down (kafka.server.KafkaServer)
[2020-06-27 04:28:15,717] INFO [SocketServer brokerId2] Shutting down (kafka.network.SocketServer)
[2020-06-27 04:28:15,718] INFO [SocketServer brokerId2] Shutdown completed (kafka.network.SocketServer)
[2020-06-27 04:28:15,721] INFO Shutting down. (kafka.log.LogManager)
[2020-06-27 04:28:15,760] INFO Shutdown complete. (kafka.log.LogManager)
[2020-06-27 04:28:15,761] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2020-06-27 04:28:15,762] INFO Session: 0x27297ff0225a5a9 closed (org.apache.zookeeper.ZooKeeper)
[2020-06-27 04:28:15,764] INFO EventThread shut down for session: 0x27297ff0225a5a9 (org.apache.zookeeper.ClientCnxn)
[2020-06-27 04:28:15,765] INFO [KafkaServer id2] shut down completed (kafka.server.KafkaServer)
[2020-06-27 04:28:15,766] INFO [KafkaServer id2] shutting down (kafka.server.KafkaServer)自己copy了server.properties文件到各个节点没有修改下面的配置 监听器的配置应该指向节点本身的主机名和端口我全部四台机器都指向了10.0.165.8所以导致了只有节点1是正常的
advertised.listenersPLAINTEXT://10.0.165.9:9092
5.2 Confluent schema-registry启动失败
[2020-06-27 16:09:39,872] WARN The replication factor of the schema topic _schemas is less than the desired one of 3. If this is a production environment, its crucial to add more brokers and increase the replication factor of the topic. (io.confluent.kafka.schemaregistry.storage.KafkaStore:242)
[2020-06-27 16:09:50,095] ERROR Server died unexpectedly: (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51)
java.lang.IllegalArgumentException: Unable to subscribe to the Kafka topic _schemas backing this data store. Topic may not exist.at io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread.init(KafkaStoreReaderThread.java:125)at io.confluent.kafka.schemaregistry.storage.KafkaStore.init(KafkaStore.java:130)at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:199)at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:64)at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:42)at io.confluent.rest.Application.createServer(Application.java:157)at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:43)因为kafkaserver没有启动
6 常用操作
1启动
confluent start
2查看日志文件目录
confluent current
3列出连接
confluent list connectors
4查看加载的连接器
confluent status connectors
[
file-source
]5查看具体连接器状态
confluent status file-source