钓鱼网站,免域名x网站,怎么免费安装dw,亩地 wordpress1. 安装
单机安装kafka Kafka对于zookeeper是强依赖#xff0c;保存kafka相关的节点数据#xff0c;所以安装Kafka之前必须先安装zookeeper dockerhub网址: https://hub.docker.com
Docker安装zookeeper
下载镜像#xff1a;
docker pull zookeeper:3.4.14创建容器
doc…1. 安装
单机安装kafka Kafka对于zookeeper是强依赖保存kafka相关的节点数据所以安装Kafka之前必须先安装zookeeper dockerhub网址: https://hub.docker.com
Docker安装zookeeper
下载镜像
docker pull zookeeper:3.4.14创建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14Docker安装kafka
下载镜像
docker pull wurstmeister/kafka:latest
docker pull bitnami/kafka:3.6.2 (用这个会有问题因为创建容器时参数设置与wurstmeister/kafka不同)创建容器
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME192.168.200.131 \
--env KAFKA_ZOOKEEPER_CONNECT192.168.200.131:2181 \
--env KAFKA_ADVERTISED_LISTENERSPLAINTEXT://192.168.200.131:9092 \
--env KAFKA_LISTENERSPLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS-Xmx256M -Xms256M \
--nethost wurstmeister/kafka:latest
测试 终端窗口A
[root192 ~]# docker exec -it kafka /bin/bash
bash-5.1# kafka-topics.sh --create --topic test --partitions 1 --replication-factor 1 --zookeeper 192.168.200.131:2181 (创建主题)
Created topic test.
bash-5.1# kafka-console-producer.sh --broker-list localhost:9092 --topic test (创建生产者)
hello (发送消息)
haha终端窗口B
[root192 ~]# docker exec -it kafka /bin/bash
bash-5.1# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning (创建接收者)
hello (收到了消息)
haha安装kafka可视化工具运行容器后打不开不知道为啥
docker run -d --name kafka-eagle -p 8048:8048 -e EFAK_CLUSTER_ZK_LIST192.168.200.131:2181 nickzurich/efak:latest集群安装
kafka.yml
version: 3.8
services:zookeeper:image: zookeeper:3.7.0restart: alwayshostname: 192.168.200.131container_name: zookeeperprivileged: trueports:- 2181:2181volumes:- /usr/local/server/zookeeper/data/:/databuild:context: .network: hostkafka1:container_name: kafka1restart: alwaysimage: wurstmeister/kafka:latestprivileged: trueports:- 9092:9092- 19092:19092environment:KAFKA_BROKER_ID: 1HOST_IP: 192.168.200.131KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.200.131:9092 ## 宿主机IPKAFKA_ZOOKEEPER_CONNECT: 192.168.200.131:2181#docker部署必须设置外部可访问ip和端口否则注册进zk的地址将不可达造成外部无法连接KAFKA_ADVERTISED_HOST_NAME: 192.168.200.131KAFKA_ADVERTISED_PORT: 9092KAFKA_PORT: 9092KAFKA_delete_topic_enable: trueKAFKA_JMX_OPTS: -Dcom.sun.management.jmxremotetrue -Dcom.sun.management.jmxremote.authenticatefalse -Dcom.sun.management.jmxremote.sslfalse -Djava.rmi.server.hostname192.168.200.131 -Dcom.sun.management.jmxremote.rmi.port19092JMX_PORT: 19092volumes:/etc/localtime:/etc/localtimedepends_on:zookeeperkafka2:container_name: kafka2restart: alwaysimage: wurstmeister/kafka:latestprivileged: trueports:- 9093:9093- 19093:19093environment:KAFKA_BROKER_ID: 2HOST_IP: 192.168.200.131KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.200.131:9093 ## 宿主机IPKAFKA_ZOOKEEPER_CONNECT: 192.168.200.131:2181#docker部署必须设置外部可访问ip和端口否则注册进zk的地址将不可达造成外部无法连接KAFKA_ADVERTISED_HOST_NAME: 192.168.200.131KAFKA_ADVERTISED_PORT: 9093KAFKA_PORT: 9093KAFKA_delete_topic_enable: trueKAFKA_JMX_OPTS: -Dcom.sun.management.jmxremotetrue -Dcom.sun.management.jmxremote.authenticatefalse -Dcom.sun.management.jmxremote.sslfalse -Djava.rmi.server.hostname192.168.200.131 -Dcom.sun.management.jmxremote.rmi.port19093JMX_PORT: 19093volumes:/etc/localtime:/etc/localtimedepends_on:zookeeperkafka3:container_name: kafka3restart: alwaysimage: wurstmeister/kafka:latestprivileged: trueports:- 9094:9094- 19094:19094environment:KAFKA_BROKER_ID: 3HOST_IP: 192.168.200.131KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.200.131:9094 ## 宿主机IPKAFKA_ZOOKEEPER_CONNECT: 192.168.200.131:2181#docker部署必须设置外部可访问ip和端口否则注册进zk的地址将不可达造成外部无法连接KAFKA_ADVERTISED_HOST_NAME: 192.168.200.131KAFKA_ADVERTISED_PORT: 9094KAFKA_PORT: 9094KAFKA_delete_topic_enable: trueKAFKA_JMX_OPTS: -Dcom.sun.management.jmxremotetrue -Dcom.sun.management.jmxremote.authenticatefalse -Dcom.sun.management.jmxremote.sslfalse -Djava.rmi.server.hostname192.168.200.131 -Dcom.sun.management.jmxremote.rmi.port19094JMX_PORT: 19094volumes:/etc/localtime:/etc/localtimedepends_on:zookeepereagle:image: gui66497/kafka_eaglecontainer_name: eagle_monitorrestart: alwaysdepends_on:- kafka1- kafka2- kafka3ports:- 8048:8048environment:ZKSERVER: 192.168.200.131:2181命令
docker-compose -f kafka.yml up -d docker-compose -f kafka.yml down docker-compose -f kafka.yml ps
[root192 images]# ls
kafka.yml
[root192 images]# docker-compose -f kafka.yml up -d
[] Running 6/6⠿ Network images_default Created 0.1s⠿ Container kafka2 Started 1.0s⠿ Container kafka3 Started 1.0s⠿ Container zookeeper Started 1.0s⠿ Container kafka1 Started 1.0s⠿ Container eagle_monitor Started 1.5s
[root192 images]#
// 但是还是用不了eagle不知道为啥防火墙是已经关了
2. springboot集成
目前springboot整合后的kafka因为序列化器是StringSerializer这个时候如果需要传递对象可以有两种方式
方式一可以自定义序列化器对象类型众多这种方式通用性不强这里不介绍
方式二可以把要传递的对象进行转json字符串接收消息后再转为对象即可这里采用这种方式
2.1 创建单点kafka和topic
[root192 images]# docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
700f01ad38e99df4a8a7979a66cb88e6b629dccc29820c18dd3213ebc60c5814
[root192 images]# docker run -d --name kafka \--env KAFKA_ADVERTISED_HOST_NAME192.168.200.131 \--env KAFKA_ZOOKEEPER_CONNECT192.168.200.131:2181 \--env KAFKA_ADVERTISED_LISTENERSPLAINTEXT://192.168.200.131:9092 \--env KAFKA_LISTENERSPLAINTEXT://0.0.0.0:9092 \--env KAFKA_HEAP_OPTS-Xmx256M -Xms256M \--nethost wurstmeister/kafka:latest
5884d54092ede091c2572e6420158529de29cf8e98da3706a572e1fa1408182e
[root192 images]# docker exec -it kafka /bin/bash
bash-5.1# kafka-topics.sh --create --topic test --partitions 1 --replication-factor 1 --zookeeper 192.168.200.131:2181
Created topic test.
bash-5.1# kafka-topics.sh --create --topic user-topic --partitions 1 --replication-factor 1 --zookeeper 192.168.200.131:2181
Created topic user-topic.
2.2 创建生产者
dependencies
!-- kafkfa --dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdexclusionsexclusiongroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.83/version/dependencyapplication.yml
server:port: 8080
spring:application:name: kafka-producerkafka:bootstrap-servers: 192.168.200.131:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializercontroller-发送消息
RestController
public class HelloController {Autowiredprivate KafkaTemplateString,String kafkaTemplate;GetMapping(/hello)public String hello(){kafkaTemplate.send(test,springboot发的第一条消息);return ok;}GetMapping(/helloUser)public String helloUser(){User user new User();user.setName(xiaowang);user.setAge(18);kafkaTemplate.send(user-topic, JSON.toJSONString(user));return ok;}
}User
public class User {private String name;private int age;public String getName() {return name;}public void setName(String name) {this.name name;}public int getAge() {return age;}public void setAge(int age) {this.age age;}Overridepublic String toString() {return User{ name name \ , age age };}
}2.3 创建消费者
dependencies
!-- kafkfa --dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdexclusionsexclusiongroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.83/version/dependencyapplication.yml
server:port: 8081
spring:application:name: kafka-consumerkafka:bootstrap-servers: 192.168.200.131:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerUser
public class User {private String name;private int age;public String getName() {return name;}public void setName(String name) {this.name name;}public int getAge() {return age;}public void setAge(int age) {this.age age;}Overridepublic String toString() {return User{ name name \ , age age };}
}消息监听器
Component
public class HelloListener {KafkaListener(topics test)public void onMessage1(String message){if(!StringUtils.isEmpty(message)){System.out.println(message);}}KafkaListener(topics user-topic)public void onMessage(String message){if(!StringUtils.isEmpty(message)){User user JSON.parseObject(message, User.class);System.out.println(user.toString());}}
}启动生产者和消费者项目浏览器输入http://127.0.0.1:8080/hello发现消费者收到消息 浏览器输入http://127.0.0.1:8080/helloUser发现消费者收到消息 项目结构
3.其它
通常在监听类直接调用service方法
Component
Slf4j
public class ArtilceIsDownListener {Autowiredprivate ApArticleConfigService apArticleConfigService;KafkaListener(topics WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)public void onMessage(String message){if(StringUtils.isNotBlank(message)){Map map JSON.parseObject(message, Map.class);apArticleConfigService.updateByMap(map);log.info(article端文章配置修改articleId{},map.get(articleId));}}
}