建站公司经营,企业宣传如何做网站,如何进行网站关键词优化,网站开发与设计SpringBoot整合SpringCloudStream3.1版本的Kafka死信队列
上一篇直通车
SpringBoot整合SpringCloudStream3.1版本Kafka
实现死信队列步骤
添加死信队列配置文件#xff0c;添加对应channel通道绑定配置对应的channel位置添加重试配置
结果 配置文件
Kafka基本配置#…SpringBoot整合SpringCloudStream3.1版本的Kafka死信队列
上一篇直通车
SpringBoot整合SpringCloudStream3.1版本Kafka
实现死信队列步骤
添加死信队列配置文件添加对应channel通道绑定配置对应的channel位置添加重试配置
结果 配置文件
Kafka基本配置application-mq.yml
server:port: 7105
spring:application:name: betrice-message-queueconfig:import:- classpath:application-bindings.ymlcloud:stream:kafka:binder:brokers: localhost:9092configuration:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer-properties:enable.auto.commit: falsebinders:betrice-kafka:type: kafkaenvironment:spring.kafka:bootstrap-servers: ${spring.cloud.stream.kafka.binder.brokers}创建死信队列配置文件application-dql.yml spring:cloud:stream:kafka:bindings:dqlTransfer-in-0:consumer:# When set to true, it enables DLQ behavior for the consumer. By default, messages that result in errors are forwarded to a topic named error.destination.group.# messages sent to the DLQ topic are enhanced with the following headers: x-original-topic, x-exception-message, and x-exception-stacktrace as byte[].# By default, a failed record is sent to the same partition number in the DLQ topic as the original record.enableDlq: truedlqName: Evad05-message-dlqkeySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
# valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerdevalueSerde: com.devilvan.pojo.Evad05MessageSerdeautoCommitOnError: trueautoCommitOffset: true注意这里的valueSerde使用了对象类型需要搭配application/json使用consumer接收到消息后会转化为json字符串
通道绑定文件添加配置application-bindings.yml
channel对应上方配置文件的dqlTransfer-in-0 spring:cloud:stream:betrice-default-binder: betrice-kafkafunction:# 声明两个channeltransfer接收生产者的消息处理完后给sinkdefinition: transfer;sink;gather;gatherEcho;dqlTransfer;evad05DlqConsumerbindings:# 添加生产者bindiing输出到destination对应的topicdqlTransfer-in-0:destination: Evad10binder: ${spring.cloud.stream.betrice-default-binder}group: evad05DlqConsumer # 使用死信队列必须要有groupcontent-type: application/jsonconsumer:maxAttempts: 2 # 当消息消费失败时尝试消费该消息的最大次数消息消费失败后发布者会重新投递。默认3backOffInitialInterval: 1000 # 消息消费失败后重试消费消息的初始化间隔时间。默认1s即第一次重试消费会在1s后进行backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2即第二次是第一次间隔时间的2倍第三次是第二次的2倍backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔默认为10000ms即10s。dqlTransfer-out-0:destination: Evad10binder: ${spring.cloud.stream.betrice-default-binder}content-type: text/plain# 消费死信队列中的消息evad05DlqConsumer-in-0:destination: Evad05-message-dlqbinder: ${spring.cloud.stream.betrice-default-binder}content-type: text/plainController
发送消息并将消息引入死信队列
Slf4j
RestController
RequestMapping(value betriceMqController)
public class BetriceMqController {Resource(name streamBridgeUtils)private StreamBridge streamBridge;PostMapping(streamSend)public void streamSend(String topic, String message) {try {streamBridge.send(topic, message);log.info(发送消息 message);} catch (Exception e) {log.error(异常消息 e);}}PostMapping(streamSendDql)public void streamSendDql(String topic, String message) {try {streamBridge.send(topic, message);log.info(发送消息 message);} catch (Exception e) {log.error(异常消息 e);}}PostMapping(streamSendJsonDql)public void streamSendJsonDql(String topic) {try {Evad05MessageSerde message new Evad05MessageSerde();message.setData(evad05 test dql);message.setCount(1);streamBridge.send(topic, message);log.info(发送消息 message);} catch (Exception e) {log.error(异常消息 e);}}
}Channel
这里使用了transfer通道消息从Evad10topic传来经过transfer()方法后抛出异常随后进入对应的死信队列
Configuration
public class BetriceMqSubChannel {Beanpublic FunctionString, String dqlTransfer() {return message - {System.out.println(transfer: message);throw new RuntimeException(死信队列测试);};}Beanpublic ConsumerString evad05DlqConsumer() {return message - {System.out.println(Topic: evad05 Dlq Consumer: message);};}
}将自定义序列化类型转换为JSON消息
步骤
1. 通道绑定文件application-bindings.yml的valueSerde属性添加自定义的序列化 2. BetriceMqController中封装该自定义类型的对象并作为消息发送
PostMapping(streamSendJsonDql)
public void streamSendJsonDql(String topic) {try {Evad05MessageSerde message new Evad05MessageSerde();message.setData(evad05 test dql);message.setCount(1);streamBridge.send(topic, message);log.info(发送消息 message);} catch (Exception e) {log.error(异常消息 e);}
}3. channelBetriceMqSubChannel接收到该消息并反序列化
Bean
public ConsumerString evad05DlqConsumer() {return message - {System.out.println(Topic: evad05 Dlq Consumer: JSON.parseObject(message, Evad05MessageSerde.class));};
}4. 结果 参考网址
Kafka 消费端消费重试和死信队列 - Java小强技术博客 (javacui.com) spring cloud stream kafka rabbit 实现死信队列_spring cloud stream kafka 死信队列_it噩梦的博客-CSDN博客