做外贸必应网站产品曝光,网站建设的技术有哪些内容,谷歌的英文网站,定制化网站建设公司消费者pull和push
pull 为主动从broker获取消息 Push为broker主动推送消息个consumer 实时性更高#xff0c;但流量要自己控制 PullBatchSize#xff0c;代表的是每次从broker的一个队列上拉取的最大消息数。 consumeThreadMax 和 consumeThreadMin 代表消费者pull消息时需要…消费者pull和push
pull 为主动从broker获取消息 Push为broker主动推送消息个consumer 实时性更高但流量要自己控制 PullBatchSize代表的是每次从broker的一个队列上拉取的最大消息数。 consumeThreadMax 和 consumeThreadMin 代表消费者pull消息时需要的线程最大和最小数量
广播模式和集群模式
广播把消息发送给订阅这个主题的所有消费者 广播消息不支持消息重试
集群是消息只要有一个消费者消费后变算为消费成功 顺序消息必须为集群模式
顺序消息
Rocketmq全局顺序消息和局部顺序 全局顺序一个Topic内所有的消息按照先进先出的顺序进行发布和消费。 部分顺序一个部分内所有的消息按照先进先出的顺序进行发布和消费。
三个阶段保证消息顺序 生产顺序性单一的生产者 并且 串行发送消息 存储时保持和发送的顺序一致 消费时保持和存储的顺序一致
关于重试
顺序消息可以设置最大重试次数若不设置则可以认为是无限次 若设置则达到最大重试次数时消息会变为已消费会执行后序消息可能无法保证消息的顺序性所以要做好顺序验证
顺序消息的重试间隔为固定时间 无序消息的重试时间为阶梯时间
重试次数可通过maxReconsumeTimes 参数进行设置
广播消息不可重试
全局顺序
生产者 创建topic时只创建一个queue所有的消息都保存在同一个broker里就可以保证顺序 或者选择其中一个队列也可以 生产者就和普通的没区别
producer.send(message);消费者
PostConstruct
public void init{consumer new DefaultMQPushConsumer(group);consumer.setNamesrvAddr(1270.0.1:9876);consumer.setMessageModel(MessageModel.BROADCASTING);consumer.setPullInterval(2000);consumer.setPullBatchSize(100);// 顺序消息设置为1多个其他的会被空置consumer.setConsumeThreadMin(1);try {consumer.subscribe(topic, tag);} catch (MQClientException e) {throw new RuntimeException(e);}consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) - {if (CollectionUtils.isEmpty(msgs)) {return ConsumeOrderlyStatus.SUCCESS;}for (MessageExt msg : msgs) {try {// 处理message消息} catch (Exception e) {log.info(Consumer message failed, e);throw new RuntimeException(e);}}return ConsumeOrderlyStatus.SUCCESS;});try {consumer.start();} catch (Exception e) {throw new RuntimeException(e);}
}局部顺序
假设场景一个订单的不同操作需要保证顺序比如订单生成-支付-完成 此时方法中arg参数传订单号即可保证需要顺序的消息有一个统一的标识可以进入到同一个队列中 顺序消息的逻辑就是通过统一标识的hashcode和队列数量size进行取余操作 所以顺序消息有个前提是这个topic的队列数量不可随意修改(倍数可以)否则顺序消息会出现异常(可提前设置此topic的队列数量为最大值16)
生产者
public void sendMsg(MessageInfo info,Object arg){String json JSON.toJSONString(info);Message message new Message(messageType, json.getBytes());try {producer.send(message, new MessageQueueSelector() {Overridepublic MessageQueue select(ListMessageQueue list, Message message, Object o) {int value arg.hashCode() % list.size();if (value 0) {value Math.abs(value);}return list.get(value);}}, arg);} catch (Exception e) {throw new RuntimeException(e);}
}
消费者 同全局顺序消费