高唐网站开发,淘客招商网站选品库建设,下载app软件安装手机上,wordpress更新 无法创建目录SpringBoot3整合RabbitMQ之三_工作队列模型案例 文章目录 SpringBoot3整合RabbitMQ之三_工作队列模型案例2. 工作队列模型1. 消息发布者1. 创建工作队列的配置类2. 发布消费Controller 2. 消息消费者One3. 消息消费者Two4. 消息消费者Three5. 输出结果 2. 工作队列模型 1. 消息…SpringBoot3整合RabbitMQ之三_工作队列模型案例 文章目录 SpringBoot3整合RabbitMQ之三_工作队列模型案例2. 工作队列模型1. 消息发布者1. 创建工作队列的配置类2. 发布消费Controller 2. 消息消费者One3. 消息消费者Two4. 消息消费者Three5. 输出结果 2. 工作队列模型 1. 消息发布者
1. 创建工作队列的配置类
package com.happy.msg.config;import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** p** Description: 工作队列模型_创建名称为 work_queue 的队列 br* /p* Datetime: 2024/3/27 18:18* Author: Yuan · JinSheng br* Since 2024/3/27 18:18*/
Configuration
public class WorkQueueConfig {BeanQueue workQueue() {return QueueBuilder.durable(work_queue).build();}
}
2. 发布消费Controller
package com.happy.msg.publisher;import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;/*** p** Description: 生产消息的控制器 br* /p* Datetime: 2024/3/27 10:53* Author: Yuan · JinSheng br* Since 2024/3/27 10:53*/
RestController
RequestMapping(/work)
public class WorkQueuePublisherController {Autowiredprivate RabbitTemplate rabbitTemplate;GetMapping(/send)public String sentMessage() {for (int i 1; i 10 ; i) {rabbitTemplate.convertAndSend(work_queue, work_queue队列第[i]条消息,hello,rabbitmqi);}return 发送成功;}
}
2. 消息消费者One
package com.happy.msg.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;
/*** p* Description: 工作队列模型_消息消费者一 br* /p* Datetime: 2024/3/28 20:28* Author: Yuan · JinSheng br* Since 2024/3/28 20:28*/
Slf4j
Component
public class WorkQueueConsumerOne {/**** param message 消息* Description: 监听work_queue队列中的消息当客户端启动后work_queue队列中的所有的消息都被此消费者消费并打印* Author: Yuan · JinSheng*/RabbitListener(queues work_queue)public void msg(Message message){byte[] messageBody message.getBody();String msg new String(messageBody);log.info(WorkQueueConsumerOne接收到work_queue队列中的消息{},接收时间{},msg, LocalDateTime.now());}
}
输出结果
WorkQueueConsumerOne接收到work_queue队列中的消息work_queue队列第[1]条消息,hello,rabbitmq1,接收时间2024-03-29T10:34:34.468074100
WorkQueueConsumerOne接收到work_queue队列中的消息work_queue队列第[3]条消息,hello,rabbitmq3,接收时间2024-03-29T10:34:34.469081600
WorkQueueConsumerOne接收到work_queue队列中的消息work_queue队列第[5]条消息,hello,rabbitmq5,接收时间2024-03-29T10:34:34.469976
WorkQueueConsumerOne接收到work_queue队列中的消息work_queue队列第[7]条消息,hello,rabbitmq7,接收时间2024-03-29T10:34:34.469976
WorkQueueConsumerOne接收到work_queue队列中的消息work_queue队列第[9]条消息,hello,rabbitmq9,接收时间2024-03-29T10:34:34.4708118003. 消息消费者Two
package com.happy.msg.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;
/*** p* Description: 工作队列模型_消息消费者二br* /p* Datetime: 2024/3/28 20:30* Author: Yuan · JinSheng br* Since 2024/3/28 20:30*/
Slf4j
Component
public class WorkQueueConsumerTwo {/**** param message 消息* Description: 监听work_queue队列中的消息当客户端启动后work_queue队列中的所有的消息都被此消费者消费并打印* Author: Yuan · JinSheng*/RabbitListener(queues work_queue)public void msg(Message message){byte[] messageBody message.getBody();String msg new String(messageBody);log.info(WorkQueueConsumerTwo接收到work_queue队列中的消息{},接收时间{},msg, LocalDateTime.now());}
}4. 消息消费者Three
package com.happy.msg.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;
/*** p* Description: 工作队列模型_消息消费者三br* /p* Datetime: 2024/3/28 20:30* Author: Yuan · JinSheng br* Since 2024/3/28 20:30*/
Slf4j
Component
public class WorkQueueConsumerTwo {/**** param message 消息* Description: 监听work_queue队列中的消息当客户端启动后work_queue队列中的所有的消息都被此消费者消费并打印* Author: Yuan · JinSheng*/RabbitListener(queues work_queue)public void msg(Message message){byte[] messageBody message.getBody();String msg new String(messageBody);//log.info(WorkQueueConsumerTwo接收到work_queue队列中的消息{},接收时间{},msg, LocalDateTime.now());System.out.println(WorkQueueConsumerTwo接收到work_queue队列中的消息msg,接收时间LocalDateTime.now());}
}5. 输出结果 可看到消息被三个消费者按相等数量消费,总共10条消息消费者1消费了4条其他两个消费者消费了3条消息 WorkQueueConsumerTwo接收到work_queue队列中的消息work_queue队列第[3]条消息,hello,rabbitmq3,接收时间2024-03-29T10:39:32.942546200
WorkQueueConsumerThree接收到work_queue队列中的消息work_queue队列第[2]条消息,hello,rabbitmq2,接收时间2024-03-29T10:39:32.942546200
WorkQueueConsumerOne接收到work_queue队列中的消息work_queue队列第[1]条消息,hello,rabbitmq1,接收时间2024-03-29T10:39:32.942037700
WorkQueueConsumerTwo接收到work_queue队列中的消息work_queue队列第[6]条消息,hello,rabbitmq6,接收时间2024-03-29T10:39:32.943806300
WorkQueueConsumerOne接收到work_queue队列中的消息work_queue队列第[4]条消息,hello,rabbitmq4,接收时间2024-03-29T10:39:32.944336900
WorkQueueConsumerTwo接收到work_queue队列中的消息work_queue队列第[9]条消息,hello,rabbitmq9,接收时间2024-03-29T10:39:32.944336900
WorkQueueConsumerThree接收到work_queue队列中的消息work_queue队列第[5]条消息,hello,rabbitmq5,接收时间2024-03-29T10:39:32.944336900
WorkQueueConsumerOne接收到work_queue队列中的消息work_queue队列第[7]条消息,hello,rabbitmq7,接收时间2024-03-29T10:39:32.944336900
WorkQueueConsumerOne接收到work_queue队列中的消息work_queue队列第[10]条消息,hello,rabbitmq10,接收时间2024-03-29T10:39:32.944336900
WorkQueueConsumerThree接收到work_queue队列中的消息work_queue队列第[8]条消息,hello,rabbitmq8,接收时间2024-03-29T10:39:32.944336900