如何做建材团购网站,前端开发培训费用是多少,网站备案证书下载不了,wordpress静态文件放到cdn在连接Kerberos认证kafka之前#xff0c;需要了解Kerberos协议 二、什么是Kerberos协议 Kerberos是一种计算机网络认证协议 #xff0c;其设计目标是通过密钥系统为网络中通信的客户机(Client)/服务器(Server)应用程序提供严格的身份验证服务#xff0c;确保通信双方身份的真…在连接Kerberos认证kafka之前需要了解Kerberos协议 二、什么是Kerberos协议 Kerberos是一种计算机网络认证协议 其设计目标是通过密钥系统为网络中通信的客户机(Client)/服务器(Server)应用程序提供严格的身份验证服务确保通信双方身份的真实性和安全性。不同于其他网络服务Kerberos协议中不是所有的客户端向想要访问的网络服务发起请求他就能建立连接然后进行加密通信而是在发起服务请求后必须先进行一系列的身份认证包括客户端和服务端两方的双向认证只有当通信双方都认证通过对方身份之后才可以互相建立起连接进行网络通信。即Kerberos协议的侧重在于认证通信双方的身份客户端需要确认即将访问的网络服务就是自己所想要访问的服务而不是一个伪造的服务器而服务端需要确认这个客户端是一个身份真实安全可靠的客户端而不是一个想要进行恶意网络攻击的用户。 三、Kerberos协议角色组成 Kerberos协议中存在三个角色分别是 客户端(Client)发送请求的一方 服务端(Server)接收请求的一方 密钥分发中心(Key distribution KDC) 一首先需要准备三个文件 user.keytabkrb5.confjass.conf
其中user.keytab和krb5.conf是两个认证文件需要厂商提供就是你连接谁的kafka让谁提供
jass.conf文件需要自己在本地创建
jass.conf文件内容如下具体路径和域名需要换成自己的
debug: truefusioninsight:kafka:bootstrap-servers: 10.80.10.3:21007,10.80.10.181:21007,10.80.10.52:21007security:protocol: SASL_PLAINTEXTkerberos:domain:name: hadoop.798687_97_4a2b_9510_00359f31c5ec.comsasl:kerberos:service:name: kafka 其中kerberos.domain.name:hadoop.798687_97_4a2b_9510_00359f31c5ec.com
hadoop.798687_97_4a2b_9510_00359f31c5ec.com需要根据现场提供给你的域名
二、文件准备好后可以将三个配置文件放在自己项目中也可以放在服务器的某个目录下只要确保项目启动后能读取到即可 我的目录结构如下 pom依赖 我用的是华为云的Kafka依赖
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.example/groupIdartifactIdkafka-sample-01/artifactIdversion2.3.1.RELEASE/versionpackagingjar/packagingnamekafka-sample-01/namedescriptionKafka Sample 1/descriptionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.2.0.RELEASE/versionrelativePath/ !-- lookup parent from repository --/parentpropertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingproject.reporting.outputEncodingUTF-8/project.reporting.outputEncodingjava.version1.8/java.version/propertiesdependenciesdependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdexclusionsexclusiongroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.4.0-hw-ei-302002/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency!-- 华为 组件 kafka start --
!-- dependency--
!-- groupIdcom.huawei/groupId--
!-- artifactIdkafka-clients/artifactId--
!-- version2.4.0/version--
!-- scopesystem/scope--
!-- systemPath${project.basedir}/lib/kafka-clients-2.4.0-hw-ei-302002.jar/systemPath--
!-- /dependency--/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/buildrepositoriesrepositoryidhuaweicloudsdk/idurlhttps://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk//urlreleasesenabledtrue/enabled/releasessnapshotsenabledtrue/enabled/snapshots/repositoryrepositoryidcentral/idnameMavn Centreal/nameurlhttps://repo1.maven.org/maven2//url/repository/repositories
/project
然后再SpringBoot项目启动类如下
package com.example;import com.common.Foo1;import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.util.backoff.FixedBackOff;import java.io.File;
import java.util.HashMap;
import java.util.Map;/*** author*/
SpringBootApplication
public class Application {private final Logger logger LoggerFactory.getLogger(Application.class);Value(${fusioninsight.kafka.bootstrap-servers})public String boostrapServers;Value(${fusioninsight.kafka.security.protocol})public String securityProtocol;Value(${fusioninsight.kafka.kerberos.domain.name})public String kerberosDomainName;Value(${fusioninsight.kafka.sasl.kerberos.service.name})public String kerberosServiceName;public static void main(String[] args) {
// String filePath System.getProperty(user.dir) File.separator src File.separator main
// String filePath D:\\Java\\workspace\\20231123MOSPT4eB\\sample-01\\src\\main\\resources\\;String filePath /home/yxxt/;System.setProperty(java.security.auth.login.config, filePath jaas.conf);System.setProperty(java.security.krb5.conf, filePath krb5.conf);SpringApplication.run(Application.class, args);}Beanpublic ConcurrentKafkaListenerContainerFactory?, ? kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ConsumerFactoryObject, Object kafkaConsumerFactory, KafkaTemplateString, String template) {System.out.println(boostrapServers);ConcurrentKafkaListenerContainerFactoryObject, Object factory new ConcurrentKafkaListenerContainerFactory();configurer.configure(factory, kafkaConsumerFactory);factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template),new FixedBackOff(0L, 2))); // dead-letter after 3 triesreturn factory;}Beanpublic RecordMessageConverter converter() {return new StringJsonMessageConverter();}// 指定消费监听该topic有消息时立刻消费KafkaListener(id fooGroup1, topics topic_ypgk)public void listen(ConsumerRecordString, String record) {System.out.println(监听到了消息-----);logger.info(Received:消息监听成功 );System.out.println(监听到了-----);System.out.println(record);
// if (foo.getFoo().startsWith(fail)) {
// // 触发83行的 ErrorHandler将异常数据写入 topic名称.DLT的新topic中
// throw new RuntimeException(failed);
// }}// 创建topic指定分区数、副本数
// Bean
// public NewTopic topic() {
// return new NewTopic(topic1, 1, (short) 1);
// }Beanpublic KafkaAdmin kafkaAdmin() {MapString, Object configs new HashMap();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);configs.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol);configs.put(sasl.kerberos.service.name, kerberosServiceName);configs.put(kerberos.domain.name, kerberosDomainName);return new KafkaAdmin(configs);}Beanpublic ConsumerFactoryObject, Object consumerFactory() {MapString, Object configs new HashMap();configs.put(security.protocol, securityProtocol);configs.put(kerberos.domain.name, kerberosDomainName);configs.put(bootstrap.servers, boostrapServers);configs.put(sasl.kerberos.service.name, kerberosServiceName);configs.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);configs.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);return new DefaultKafkaConsumerFactory(configs);}Beanpublic KafkaTemplateString, String kafkaTemplate() {MapString, Object configs new HashMap();configs.put(security.protocol, securityProtocol);configs.put(kerberos.domain.name, kerberosDomainName);configs.put(bootstrap.servers, boostrapServers);configs.put(sasl.kerberos.service.name, kerberosServiceName);configs.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);configs.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);ProducerFactoryString, String producerFactory new DefaultKafkaProducerFactory(configs);return new KafkaTemplate(producerFactory);}
}
生产者通过发送请求进行向主题里发送消息
package com.example;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;import com.common.Foo1;/*** author haosuwei**/
RestController
public class Controller {Autowiredprivate KafkaTemplateString, String template;PostMapping(path /send/foo/{what})public void sendFoo(PathVariable String what) {Foo1 foo1 new Foo1(what);this.template.send(topic1, foo1.toString());}}
运行成功就可以监听到主题消息了