网站建设net接口,四川省城乡建设网网站,毕业设计购物网站开发的意义,十大网络安全上市公司说明
CompletableFuture对象需要的SpringBoot版本为3.X.X以上#xff0c;需要的kafka依赖版本为3.X.X以上#xff0c;需要的jdk版本17以上。
1、阻塞式#xff08;等待式#xff09;获取生产者发送的消息
生产者#xff1a;
package com.power.producer;import org.ap…
说明
CompletableFuture对象需要的SpringBoot版本为3.X.X以上需要的kafka依赖版本为3.X.X以上需要的jdk版本17以上。
1、阻塞式等待式获取生产者发送的消息
生产者
package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;Component
public class EventProducer {Resourceprivate KafkaTemplateString,String kafkaTemplate;public void getResult(){//Integer partition, Long timestamp, K key, Nullable V dataCompletableFutureSendResultString, String result kafkaTemplate.sendDefault(0, System.currentTimeMillis(), k3, hello-kafka);//怎么拿结果通过ListenableFuture类拿结果try {//1、阻塞式等待拿结果SendResultString, String sendResult result.get();if(null!sendResult.getRecordMetadata()){//kafka服务器确认已经拿到了消息System.out.println(消息发送成功sendResult.getRecordMetadata().toString());}System.out.println(producerRecord:sendResult.getProducerRecord());} catch (Exception e) {e.printStackTrace();}}
}测试类
package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;SpringBootTest
public class SpringBoot01KafkaBaseApplication {Resourceprivate EventProducer eventProducer;Testvoid getResult(){eventProducer.getResult();}
}测试结果
消息发送成功default-topic-01
2024-08-22 22:18:51.344 INFO 8976 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientIdconsumer-hello-group-1, groupIdhello-group] Adding newly assigned partitions: hello-topic-0
producerRecord:ProducerRecord(topicdefault-topic, partition0, headersRecordHeaders(headers [], isReadOnly true), keyk3, valuehello-kafka, timestamp1724336330821)2、非阻塞式非等待式获取生产者发送的消息
生产者
package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;Component
public class EventProducer {Resourceprivate KafkaTemplateString,String kafkaTemplate;public void getResult2(){//Integer partition, Long timestamp, K key, Nullable V dataCompletableFutureSendResultString, String result kafkaTemplate.sendDefault(0, System.currentTimeMillis(), k3, hello-kafka);//怎么拿结果通过CompletableFuture类拿结果try {//2、非阻塞式等待拿结果result.thenAccept((sendResult)-{if(null!sendResult.getRecordMetadata()){//kafka服务器确认已经拿到了消息System.out.println(消息发送成功sendResult.getRecordMetadata().toString());}System.out.println(producerRecord:sendResult.getProducerRecord());}).exceptionally((e)-{e.printStackTrace();//做消息发送失败的处理System.out.println(消息发送失败);return null;});} catch (Exception e) {e.printStackTrace();}}}测试类
Test
void getResult2(){eventProducer.getResult2();
}