当前位置: 首页 > news >正文

平面设计素材网站推荐网络公司网站报价方案

平面设计素材网站推荐,网络公司网站报价方案,连锁店进销存软件,安徽 网站开发文章目录 消费模式同步消息异步消息单向消息延迟消息批量消息顺序消息事务消息Tag标签和Key键Tag的使用Key的使用 首先引入rocketmq的依赖 dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdve… 文章目录 消费模式同步消息异步消息单向消息延迟消息批量消息顺序消息事务消息Tag标签和Key键Tag的使用Key的使用 首先引入rocketmq的依赖 dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion4.9.2/version /dependency然后我们编写一个简单的生产者和消费者 SpringBootTest public class RocketMQTest {/*** 对于生产者 同一组的生产者可以向不同的topic队列发送消息*/Testpublic void produce() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer new DefaultMQProducer(test-producer-group);producer.setNamesrvAddr(MQConstant.NAMESRV);producer.start();Message message new Message(testTopic,一个简单的消息.getBytes());SendResult sendResult producer.send(message);System.out.println(sendResult.getSendStatus());producer.shutdown();}/*** 对于消费者 同一组的消费者只能接收同一个topic的消息* 并且如果存在多个消费者组他们都监听同一个topic的消息* 那么就可以选择使用 负载均衡策略 或者 广播策略*/Testpublic void consume() throws MQClientException, IOException {//创建一个消费者DefaultMQPushConsumer consumer new DefaultMQPushConsumer(test-producer-group);consumer.setNamesrvAddr(MQConstant.NAMESRV);// * 标识订阅这个主题中的所有消息 后期会有消息过滤consumer.subscribe(testTopic, *);//设置一个监听器 他会一直监听然后是一个异步回调的机制//那么我们就不能让他start之后这个方法就返回结束 需要挂起当前的JVM(test模式得这样子)//正常运行项目的时候项目的JVM会正常运行的 不需要挂起consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext context) {//这个就是对应的消费方法 业务处理//消息如果消费失败 那么就要重新放入到消费队列System.out.println(我是消费者);System.out.println(list.get(0).toString());System.out.println(消息上下文context);//返回值如果为null/报错/RECONSUMER_LATER 代表消费失败//消息会重新回到队列 然后过一会在投递给当前消费者或者其他消费者return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});//启动consumer.start();//挂起当前的JVMSystem.in.read();} }这里需要注意的是对于Rocketmq如果在你的监听器中也就是这个MessageListenerConcurrently中你的返回值为null或者ConsumeConcurrentlyStatus.RECONSUME_LATER亦或者抛出了一个异常那么这条消息都会重新的被放回到我们的队列中等待其他消费者或者当前消费者再一次消费。 消费模式 MQ的消费模式可以大致分为两种一种是推Push一种是拉Pull。 Push是服务端【MQ】主动推送消息给客户端优点是及时性较好但如果客户端没有做好流控一旦服务端推送大量消息到客户端时就会导致客户端消息堆积甚至崩溃。 Pull是客户端需要主动到服务端取数据优点是客户端可以依据自己的消费能力进行消费但拉取的频率也需要用户自己控制拉取频繁容易造成服务端和客户端的压力拉取间隔长又容易造成消费不及时。 Push模式也是基于pull模式的只能客户端内部封装了api一般场景下上游消息生产量小或者均速的时候选择push模式。在特殊场景下例如电商大促抢优惠券等场景可以选择pull模式 同步消息 上面的快速入门就是发送同步消息发送过后会有一个返回值也就是mq服务器接收到消息后返回的一个确认这种方式非常安全但是性能上并没有这么高而且在mq集群中也是要等到所有的从机都复制了消息以后才会返回所以针对重要的消息可以选择这种方式 异步消息 异步消息通常用在对响应时间敏感的业务场景即发送端不能容忍长时间地等待Broker的响应。发送完以后会有一个异步消息通知。 Test public void testAsyncProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer new DefaultMQProducer(test-producer-group);// 设置nameServer地址producer.setNamesrvAddr(MQConstant.NAMESRV);// 启动实例producer.start();Message msg new Message(testTopic, (异步消息).getBytes());producer.send(msg, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {System.out.println(发送成功);}Overridepublic void onException(Throwable e) {System.out.println(发送失败);}});System.out.println(看看谁先执行);// 挂起jvm 因为回调是异步的不然测试不出来System.in.read();// 关闭实例producer.shutdown(); }单向消息 这种方式主要用在不关心发送结果的场景这种方式吞吐量很大但是存在消息丢失的风险例如日志信息的发送。 Test public void testOnewayProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer new DefaultMQProducer(test-producer-group);// 设置nameServer地址producer.setNamesrvAddr(MQConstant.NAMESRV);// 启动实例producer.start();Message msg new Message(testTopic, (单向消息).getBytes());// 发送单向消息producer.sendOneway(msg);// 关闭实例producer.shutdown(); }延迟消息 消息放入mq后过一段时间才会被监听到然后消费 比如下订单业务提交了一个订单就可以发送一个延时消息30min后去检查这个订单的状态如果还是未付款就取消订单释放库存。 这里注意的是RocketMQ不支持任意时间的延时 只支持以下几个固定的延时等级等级1就对应1s以此类推最高支持2h延迟 private String messageDelayLevel “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”; Test public void testDelayProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer new DefaultMQProducer(test-group);// 设置nameServer地址producer.setNamesrvAddr(localhost:9876);// 启动实例producer.start();Message msg new Message(TopicTest, (延迟消息).getBytes());// 给这个消息设定一个延迟等级// messageDelayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hmsg.setDelayTimeLevel(3);// 发送单向消息producer.send(msg);// 打印时间System.out.println(new Date());// 关闭实例producer.shutdown(); }批量消息 批量消息就是一次性发送一个消息集合出去。 Test public void testBatchProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer new DefaultMQProducer(test-producer-group);// 设置nameServer地址producer.setNamesrvAddr(MQConstant.NAMESRV);// 启动实例producer.start();ListMessage messages Arrays.asList(new Message(testTopic, 批量消息1.getBytes()),new Message(testTopic, 批量消息2.getBytes()),new Message(testTopic, 批量消息3.getBytes()));producer.send(messages);System.out.println(批量执行任务);// 挂起jvm 因为回调是异步的不然测试不出来System.in.read();// 关闭实例producer.shutdown(); }顺序消息 我们知道一个topic中可以有多个队列那么如果我们的消息发送到多个队列中去那么很明显我们的消息消费就是并行消费的也就是没有了顺序性。 因此如果我们需要发送顺序消息也就是希望MQ那边的消费者顺序的消费一些消息我们就得按照如下方式发送顺序消息。 消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序可以分为分区有序或者全局有序。 可能大家会有疑问mq不就是FIFO吗 rocketMq的broker的机制导致了rocketMq会有这个问题 因为一个broker中对应了四个queue。 不同的queue(分区队列)而消费消息的时候从多个queue上拉取消息这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中消费的时候只从这个queue上依次拉取则就保证了顺序。当发送和消费参与的queue只有一个则是全局有序如果多个queue参与则为分区有序即相对每个queue消息都是有序的。 下面用订单进行分区有序的示例。一个订单的顺序流程是下订单、发短信通知、物流、签收。订单顺序号相同的消息会被先后发送到同一个队列中消费时同一个顺序获取到的肯定是同一个队列。 package zhang.blossom.seckillbyrocketmq;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.exception.RemotingException; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import zhang.blossom.seckillbyrocketmq.constant.MQConstant; import zhang.blossom.seckillbyrocketmq.entity.MsgModel;import java.io.IOException; import java.util.Arrays; import java.util.List;/*** author: 张锦标* date: 2023/8/17 9:58* OrderedRocketMQTest类*/SpringBootTest public class OrderedRocketMQTest {private ListMsgModel msgModels Arrays.asList(new MsgModel(qwer, 1L, 下单),new MsgModel(qwer, 1L, 短信),new MsgModel(qwer, 1L, 物流),new MsgModel(zxcv, 2L, 下单),new MsgModel(zxcv, 2L, 短信),new MsgModel(zxcv, 2L, 物流));//发送顺序消息Testpublic void orderedProducer() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer new DefaultMQProducer(test-producer-group);producer.setNamesrvAddr(MQConstant.NAMESRV);producer.start();//发送顺序消息 发送时要确保有序 并且要发送到同一个队列下面去msgModels.forEach(msgModel - {Message message new Message(testTopic,msgModel.toString().getBytes());try {//发送 相同的订单号应该去相同的队列producer.send(message, new MessageQueueSelector() {//这里的send方法的第三个参数arg 就是这个队列选择器的第三个参数 会传递过来Overridepublic MessageQueue select(ListMessageQueue list, Message message, Object arg) {//这个方法的返回值就是要选择的队列//这里可以用hash的方式就可以选择到同样的队列了int hash arg.toString().hashCode();int index hash % list.size();return list.get(index);}}, msgModel.getOrderSn());} catch (MQClientException e) {throw new RuntimeException(e);} catch (RemotingException e) {throw new RuntimeException(e);} catch (MQBrokerException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);}});producer.shutdown();System.out.println(发送完毕);}Testpublic void orderedConsumer() throws MQClientException, IOException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(test-producer-group);consumer.setNamesrvAddr(MQConstant.NAMESRV);consumer.subscribe(testTopic, *);//MessageListenerConcurrently 并发模式 多线程的 失败后最多重试16次 然后放入死信队列//MessageListenerOrderly 顺序模式 单线程的 失败后无限次重试 Integer.MAX_VALUEconsumer.registerMessageListener(new MessageListenerOrderly() {//顺序模式只有一个线程来执行消费Overridepublic ConsumeOrderlyStatus consumeMessage(ListMessageExt list,ConsumeOrderlyContext consumeOrderlyContext) {//这里的一个线程是一个队列一个线程System.out.println(new String(list.get(0).getBody()));return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.in.read();} }事务消息 一般我们不使用RocketMQ的事务消息所以有兴趣的可以看看其他的实现。 Tag标签和Key键 Rocketmq提供消息过滤功能通过tag或者key进行区分。 我们往一个主题里面发送消息的时候根据业务逻辑可能需要区分比如带有tagA标签的被A消费带有tagB标签的被B消费还有在事务监听的类里面只要是事务消息都要走同一个监听我们也需要通过过滤才区别对待。 Tag的使用 Test public void tagProducer() throws Exception {DefaultMQProducer producer new DefaultMQProducer(test-producer-group);producer.setNamesrvAddr(MQConstant.NAMESRV);producer.start();Message message1 new Message(testTopic, test1,test1的消息.getBytes() );Message message2 new Message(testTopic, test2,test2的消息.getBytes() );producer.send(message1);producer.send(message2);producer.shutdown();System.out.println(消息发送成功); }Test public void test1Consumer() throws MQClientException, IOException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(test-producer-group);consumer.setNamesrvAddr(MQConstant.NAMESRV);consumer.subscribe(testTopic, test1);consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list,ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println(消费test1的消息new String(list.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read(); } Test public void test2Consumer() throws MQClientException, IOException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(test-producer-group);consumer.setNamesrvAddr(MQConstant.NAMESRV);consumer.subscribe(testTopic, test1 || test2);consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list,ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println(消费test1/test2的消息new String(list.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read(); }什么时候该用Topic什么时候该用 Tag 总结不同的业务应该使用不同的Topic如果是相同的业务里面有不同表的表现形式那么我们要使用tag进行区分 可以从以下几个方面进行判断 1.消息类型是否一致如普通消息、事务消息、定时延时消息、顺序消息不同的消息类型使用不同的 Topic无法通过 Tag 进行区分。 2.业务是否相关联没有直接关联的消息如淘宝交易消息京东物流消息使用不同的 Topic 进行区分而同样是天猫交易消息电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分。 3.消息优先级是否一致如同样是物流消息盒马必须小时内送达天猫超市 24 小时内送达淘宝物流则相对会慢一些不同优先级的消息用不同的 Topic 进行区分。 4.消息量级是否相当有些业务消息虽然量小但是实时性要求高如果跟某些万亿量级的消息使用同一个 Topic则有可能会因为过长的等待时间而“饿死”此时需要将不同量级的消息进行拆分使用不同的 Topic。 总的来说针对消息分类您可以选择创建多个Topic或者在同一个 Topic 下创建多个 Tag。但通常情况下不同的 Topic 之间的消息没有必然的联系而 Tag 则用来区分同一个 Topic 下相互关联的消息例如全集和子集的关系、流程先后的关系。 Key的使用 在rocketmq中的消息默认会有一个messageId当做消息的唯一标识我们也可以给消息携带一个key用作唯一标识或者业务标识包括在控制面板查询的时候也可以使用messageId或者key来进行查询。 Test public void testKeyProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer new DefaultMQProducer(test-producer-group);// 设置nameServer地址producer.setNamesrvAddr(MQConstant.NAMESRV);// 启动实例producer.start();Message msg new Message(testTopic,test1,key, 我是一个带标记和key的消息.getBytes());SendResult send producer.send(msg);System.out.println(send);// 关闭实例producer.shutdown(); }Test public void testKeyConsumer() throws Exception {// 创建默认消费者组DefaultMQPushConsumer consumer new DefaultMQPushConsumer(test-producer-group);// 设置nameServer地址consumer.setNamesrvAddr(MQConstant.NAMESRV);// 订阅一个主题来消费 表达式默认是*,支持tagA || tagB || tagC 这样或者的写法 只要是符合任何一个标签都可以消费consumer.subscribe(testTopic, test1 || test2 || test3);// 注册一个消费监听 MessageListenerConcurrently是并发消费// 默认是20个线程一起消费可以参看 consumer.setConsumeThreadMax()consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {// 这里执行消费的代码 默认是多线程消费System.out.println(Thread.currentThread().getName() ---- new String(msgs.get(0).getBody()));System.out.println(msgs.get(0).getTags());System.out.println(msgs.get(0).getKeys());return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read(); }
http://www.zqtcl.cn/news/210579/

相关文章:

  • 不备案的网站很慢网站双线主机优势
  • 南京电子商务网站建设23个营销专业术语
  • 建设银行官网官方网站学习网页制作的网站
  • 开发网站需要什么硬件今年最流行的装修风格
  • 门户网站建设中标结果百度资讯指数
  • 定制企业网站开发公司网站建设的6个基本步骤
  • 网站建设与维护案列网站作品怎么做
  • 茂名放心营销网站开发seo收费
  • 旅游网站品牌建设本地使用宝塔安装wordpress
  • 专门做外链的网站制作论坛类网站模板免费下载
  • 靖江建设行业协会网站投资做网站
  • 做网站视频背景潍坊网站制作建设
  • 深圳市官网网站建设哪家好百度抓取网站登录
  • 免费做cpa单页网站友情链接买卖代理
  • 免费网站建站排名中国最大的软件公司
  • 码云pages做静态网站广西建设培训网
  • 建设网站需要花钱吗网站seo方案策划书
  • 德阳网站怎么做seo陈木胜个人资料
  • 电子规划书商务网站建设wordpress主机推荐
  • wordpress设置多站点html5开发手机app
  • 移动互联和网站开发哪个好做推广便宜的网站有哪些
  • 极速网站建设定制价格微信公众号运营助手
  • .net制作网站开发教程在线修图编辑器
  • 哪些网站可以做详情页聊城高新区建设局网站
  • 湖南网站优化代运营山东建设厅证件查询网址
  • 以百度云做网站空间浙江外贸网站建设
  • 南通网站建设推广专家wordpress 信息流 主题
  • 网站培训机构有哪些大学生做企业网站
  • 网站培训班有哪些课程做的好的大学生旅行有哪些网站好
  • 昌江县住房和城乡建设局网站佛山建设网站制作