医院建设网站,网站下方一般放什么原因,在线看免费网站,中国能源建设集团招聘网站前言
Rocket的请求应答消息是指在使用Rocket#xff08;这里可能是RocketMQ或者Rocket框架#xff09;进行通信时#xff0c;客户端发送一个请求到服务端#xff0c;然后服务端处理该请求并返回一个响应的过程中的数据交换。
在RocketMQ中#xff1a;
请求应答消息通常…前言
Rocket的请求应答消息是指在使用Rocket这里可能是RocketMQ或者Rocket框架进行通信时客户端发送一个请求到服务端然后服务端处理该请求并返回一个响应的过程中的数据交换。
在RocketMQ中
请求应答消息通常涉及到以下几个步骤
生产者Producer创建一个消息并将其发送到Broker消息中间件服务器。Broker接收到消息后可能需要进行存储、路由或者其他处理操作。如果请求是需要立即响应的例如RPC调用Broker会在处理完消息后生成一个响应消息并通过网络返回给生产者。生产者接收到响应消息后可以根据响应内容进行相应的业务处理。
在Rocket框架中
请求应答消息通常涉及到HTTP请求和响应
客户端通常是Web浏览器或者API客户端向Rocket应用服务器发送一个HTTP请求请求可能包含JSON、XML或者其他格式的数据。Rocket框架接收到请求后根据路由规则将请求分发到对应的处理器函数handler。处理器函数处理请求这可能包括查询数据库、计算结果或者其他业务逻辑。处理完成后处理器函数构建一个HTTP响应响应中包含处理结果以及可能的状态码、头部信息等。Rocket框架将响应返回给客户端客户端解析响应并进行相应的处理。
无论是RocketMQ还是Rocket框架请求应答消息都是系统间或者组件间通信的基本机制用于实现功能调用、数据交换或者状态同步。
请求应答消息
这个消息类型比较有意思类似一种RPC的模式
生产者发送消息之后可以阻塞等待消费者消费这个消息的之后返回的结果
生产者通过过调用request方法发送消息接收回复消息
public class Producer {public static void main(String[] args) throws Exception {//创建一个生产者指定生产者组为 sanyouProducerDefaultMQProducer producer new DefaultMQProducer(sanyouProducer);// 指定NameServer的地址producer.setNamesrvAddr(192.168.200.143:9876);// 启动生产者producer.start();Message message new Message(sanyouTopic, 三友的java日记.getBytes());//发送消息拿到响应结果 3000代表超时时间3s内未拿到响应结果就超时会抛出RequestTimeoutException异常Message result producer.request(message, 3000);System.out.println(接收到响应消息 result);// 关闭生产者producer.shutdown();}}而对于消费者来着当消费完消息之后也要作为生产者将响应的消息发送出去
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {//创建一个生产者指定生产者组为 sanyouProducerDefaultMQProducer producer new DefaultMQProducer(sanyouProducer);// 指定NameServer的地址producer.setNamesrvAddr(192.168.200.143:9876);// 启动生产者producer.start();// 通过push模式消费消息指定消费者组DefaultMQPushConsumer consumer new DefaultMQPushConsumer(sanyouConsumer);// 指定NameServer的地址consumer.setNamesrvAddr(192.168.200.143:9876);// 订阅这个topic下的所有的消息consumer.subscribe(sanyouTopic, *);// 注册一个消费的监听器当有消息的时候会回调这个监听器来消费消息consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.printf(消费消息:%s, new String(msg.getBody()) \n);try {// 用RocketMQ自带的工具类创建响应消息Message replyMessage MessageUtil.createReplyMessage(msg, 这是响应消息内容.getBytes(StandardCharsets.UTF_8));// 将响应消息发送出去拿到发送结果SendResult replyResult producer.send(replyMessage, 3000);System.out.println(响应消息的结果 replyResult);} catch (Exception e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();System.out.printf(Consumer Started.%n);}
}这种请求-应答消息实现原理也比较简单如下图所示 生产者和消费者会跟RocketMQ服务端进行网络连接
所以他们都是通过这个连接来发送和拉取消息的
当服务端接收到回复消息之后有个专门处理回复消息的类 这个类就会直接找到发送消息的生产者的连接之后会通过这个连接将回复消息发送给生产者 RocketMQ底层是基于Netty通信的所以如果你有用过Netty的话应该都知道就是通过Channel来发送的 联系方式
关于文章中大家有任何疑问可以通过关注公众号《编程乐学》进行留言同时公众号还有更多有趣的项目以及关于学习编程的笔记资料大家可以看看欢迎大家进行留言。