教育网站设计案例,学校网站设计,搭建视频网页,建设电子商务网站的必要性需求
我们需要通过Spring Kafka库#xff0c;将消息推送给Kafka的topic中。这里假设Kafka的集群和用户我们都有了。这里Kafka认证采取SASL_PLAINTEXT方式接入#xff0c;SASL 采用 SCRAM-SHA-256 方式加解密。
pom.xml
dependencygroupIdorg.springframew…需求
我们需要通过Spring Kafka库将消息推送给Kafka的topic中。这里假设Kafka的集群和用户我们都有了。这里Kafka认证采取SASL_PLAINTEXT方式接入SASL 采用 SCRAM-SHA-256 方式加解密。
pom.xml
dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId
/dependency我这里不需要写版本号应为我使用的Spring Boot。Spring Boot会自动帮我挑选spring-kafka应该使用哪个版本合适。
application.yml
spring:kafka:producer:# kafka集群地址bootstrap-servers: xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092client-id: producer-dev# SASL_PLAINTEXT 接入方式security:protocol: SASL_PLAINTEXT# 反序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:# SASL 采用 SCRAM-SHA-256 方式sasl:mechanism: SCRAM-SHA-256# jaas配置jaas:options:username: kafkauserpassword: kafkapwdenabled: truelogin-module: org.apache.kafka.common.security.scram.ScramLoginModulecontrol-flag: required以上是关于Spring Kafka的全部配置。下面摘要出来的配置是可以单独配置在配置中心的
topic:# 接收消息的主题配置save: hello_kafka_topic
spring:kafka:producer:client-id: producer-dev# kafka集群地址bootstrap-servers: xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092# jaas配置jaas:options:username: kafkauserpassword: kafkapwdJava
KafkaProducerService.java public interface KafkaProducerService {/*** 转发消息到kafka*/void sendToKafka(String msg);}KafkaProducerServiceImpl.java
import cn.com.xxx.service.KafkaProducerService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.kafka.core.KafkaProducerException;
import org.springframework.kafka.core.KafkaSendCallback;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;/*** 转发消息到kafka*/
RefreshScope
Slf4j
Service
public class KafkaProducerServiceImpl implements KafkaProducerService {Resourceprivate KafkaTemplateString, String kafkaTemplate;/*** kafka接收消息的主题*/Value(${topic.save})private String topic;Overridepublic void sendToKafka(String msg) {log.info(String.format($$$$ Producing message: %s, msg));ProducerRecordString, String recordKafka new ProducerRecord(topic, msg);ListenableFutureSendResultString, String future kafkaTemplate.send(recordKafka);future.addCallback(new KafkaSendCallbackString, String() {Overridepublic void onSuccess(SendResultString, String result) {log.info(成功发消息:{}给Kafka:{}, msg, result);}Overridepublic void onFailure(KafkaProducerException ex) {log.error(发消息:{}给Kafka:{}, msg, recordKafka, ex);}});}
}
到这里为止Spring Kafka生产者所有配置就都可以了。这里使用的异步监听kafka回调的方式发送消息。
总结
这里使用Spring Kafka库异回调步给Kafka消息。这里使用的Spring Kafka库是老版本所以这里的使用的回调类是ListenableFuture类。
参考
Spring for Apache Kafka2.8.3Spring for Apache Kafka