当前位置: 首页 > news >正文

小米官方网站开发版在哪里北京集团公司注册流程

小米官方网站开发版在哪里,北京集团公司注册流程,wordpress萌化,公司注册费用计入什么科目一、mq的作用和使用场景 MQ的基本作用 MQ#xff08;Message Queue#xff0c;消息队列#xff09;是一种应用程序对应用程序的通信方法#xff0c;主要作用包括#xff1a; 异步处理#xff1a;解耦生产者和消费者#xff0c;允许生产者发送消息后立即返回#xff0…一、mq的作用和使用场景 MQ的基本作用 MQMessage Queue消息队列是一种应用程序对应用程序的通信方法主要作用包括 异步处理解耦生产者和消费者允许生产者发送消息后立即返回消费者异步处理 应用解耦降低系统间的直接依赖通过消息进行间接通信 流量削峰缓冲突发流量避免系统被压垮 消息通信实现系统间的可靠消息传递 最终一致性支持分布式事务的最终一致性方案 主要使用场景 1. 异步处理 用户注册后发送邮件/短信注册流程快速完成通知类操作异步处理 日志收集应用将日志发送到MQ由专门服务异步处理 2. 应用解耦 电商订单系统订单服务生成订单后通过MQ通知库存、物流、支付等系统 微服务架构服务间通过MQ通信而非直接调用 3. 流量削峰 秒杀系统将大量请求先放入MQ按系统能力逐步处理 突发流量处理应对促销活动等流量高峰 4. 日志处理 大数据分析收集各系统日志到MQ由大数据平台统一处理 实时监控系统指标通过MQ传输到监控平台 5. 消息通信 聊天系统用户消息通过MQ传递 通知系统系统间的事件通知 适用场景总结表 场景关键技术优势应用解耦消息队列减少系统间直接依赖异步处理生产者-消费者模型提升响应速度流量削峰队列积压限速消费保护后端系统跨语言通信AMQP 多语言支持统一通信协议发布/订阅Exchangefanout/topic一对多消息广播延迟队列TTL 死信队列实现定时任务 二、mq的优点 1. 解耦系统组件 生产者和消费者无需相互感知对方的存在 系统间通过消息通信而非直接调用降低耦合度 新增消费者不会影响生产者代码 2. 异步处理提升性能 生产者发送消息后无需等待消费者处理完成 非关键路径操作可异步执行如发送通知、记录日志 显著减少系统响应时间提高吞吐量 3. 流量削峰与过载保护 缓冲突发流量避免系统被瞬间高峰压垮 消费者可按自身处理能力从队列获取消息 特别适合秒杀、促销等瞬时高并发场景 4. 提高系统可靠性 消息持久化确保重要数据不丢失 重试机制和死信队列处理失败消息 网络波动时仍能保证消息最终送达 5. 扩展性强 可轻松增加消费者实例提高处理能力 天然支持分布式系统架构 各组件可独立扩展生产者、MQ本身、消费者 6. 顺序保证 某些MQ如Kafka可保证消息顺序性 对需要严格顺序的业务场景非常重要 7. 最终一致性支持 实现分布式事务的最终一致性方案 通过消息驱动的方式同步系统状态 比强一致性方案性能更高 8. 灵活的通信模式 支持点对点、发布/订阅等多种模式 可实现广播、组播等不同消息分发方式 适应各种业务场景需求 9. 系统恢复能力 消费者宕机恢复后可从断点继续消费 避免数据丢失或重复处理 支持消息回溯重新消费 10. 平衡资源利用率 平滑系统负载避免资源闲置或过载 提高整体资源使用效率 降低系统建设成本无需按峰值配置资源 三、mq的缺点 1. 系统复杂度增加 引入MQ后系统架构变得更加复杂需要额外维护MQ集群 需要处理消息的发送、接收、确认、重试等逻辑 增加了调试和问题排查的难度如消息丢失、重复消费等 2. 消息一致性问题 消息丢失生产者发送失败、MQ宕机、消费者处理失败都可能导致消息丢失 消息重复网络问题或消费者超时可能导致消息被重复消费需业务层做幂等处理 顺序问题某些MQ如Kafka只能保证分区内有序全局有序需要额外设计 3. 延迟问题 异步处理导致延迟消息队列的消费通常是异步的不适合实时性要求极高的场景如支付交易 堆积时延迟加剧如果消费者处理速度跟不上消息堆积会导致延迟越来越高 4. 运维成本高 集群管理MQ本身需要高可用部署如Kafka的ZooKeeper依赖、RabbitMQ的镜像队列 监控与告警需监控消息积压、消费延迟、错误率等指标 资源占用MQ集群可能占用较多CPU、内存和磁盘IO 5. 数据一致性与事务问题 分布式事务挑战如果业务涉及数据库和MQ的协同如扣库存发消息需要引入事务消息或本地消息表等方案 最终一致性MQ通常只保证最终一致性不适合强一致性要求的场景 6. 依赖风险 MQ成为单点故障如果MQ集群崩溃可能导致整个系统不可用 版本兼容性问题MQ升级可能影响生产者和消费者的兼容性 7. 消息积压风险 消费者处理能力不足如果消费者宕机或处理缓慢消息会堆积可能导致MQ存储爆满 影响新消息处理积压严重时新消息可能被阻塞或丢弃 8. 不适合所有场景 低延迟场景如高频交易、实时游戏MQ的异步机制可能引入不可接受的延迟 小规模系统如果系统简单直接调用可能比引入MQ更高效 四、mq相关产品每种产品的特点 1. RabbitMQ 特点 基于AMQP协议支持多种客户端语言 轻量级易于部署和管理 提供灵活的路由机制直连/主题/扇出/头交换 支持消息确认、持久化、优先级队列 集群部署相对简单 社区活跃文档完善 适用场景 中小规模消息处理 需要复杂路由规则的场景 企业级应用集成 对延迟要求不高的异步任务 2. Kafka 特点 超高吞吐量百万级TPS 分布式、高可用设计 基于发布/订阅模式 消息持久化存储可配置保留时间 支持消息回溯和批量消费 水平扩展能力强 支持流式处理Kafka Streams 适用场景 大数据日志收集与分析 实时流处理 高吞吐量消息系统 事件溯源 监控数据聚合 3. RocketMQ 特点 阿里开源经受双11考验 支持事务消息 严格的顺序消息 支持消息轨迹查询 分布式架构高可用 支持定时/延迟消息 支持消息过滤 适用场景 电商交易系统 金融支付场景 需要严格顺序的消息处理 分布式事务场景 4. ActiveMQ 特点 支持JMS规范 支持多种协议STOMP、AMQP、MQTT等 提供消息持久化和事务支持 支持集群部署 相对轻量级 适用场景 传统企业应用集成 需要JMS支持的场景 中小型消息系统 IoT设备通信 5. Pulsar 特点 云原生设计计算存储分离架构 支持多租户 低延迟和高吞吐并存 支持多种消费模式独占/共享/故障转移 支持分层存储热数据冷数据 内置函数计算能力 适用场景 云原生应用 多租户SaaS平台 需要统一消息和流处理的场景 混合云部署 6. ZeroMQ 特点 无中间件基于库的方式 极高性能纳秒级延迟 支持多种通信模式请求-响应/发布-订阅等 轻量级无消息持久化 无broker架构 适用场景 高性能计算 低延迟通信 进程间通信 不需要持久化的场景 7. NATS 特点 极简设计性能优异 无持久化NATS Streaming提供持久化扩展 支持请求-响应模式 轻量级适合云环境 低资源消耗 适用场景 IoT设备通信 云原生微服务 不需要持久化的实时消息 服务发现和配置分发 选型建议对比表 特性 \ MQRabbitMQKafkaRocketMQPulsarActiveMQ吞吐量中极高高高中延迟低中低低中顺序保证有限分区有序严格有序分区有序有限持久化支持支持支持支持支持事务支持有限支持支持支持支持集群扩展中等容易中等容易中等运维复杂度低高中中低适用规模中小超大中大中大中小 五、rabbitmq的搭建过程 Docker安装方式 # 拉取镜像 docker pull rabbitmq:management ​ # 运行容器 docker run -d --name rabbitmq \ -p 5672:5672 -p 15672:15672 \ -e RABBITMQ_DEFAULT_USERadmin \ -e RABBITMQ_DEFAULT_PASSpassword \ rabbitmq:management Linux安装方式 # 1. 安装Erlang sudo apt-get install erlang ​ # 2. 下载RabbitMQ wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.9/rabbitmq-server_3.8.9-1_all.deb ​ # 3. 安装 sudo dpkg -i rabbitmq-server_3.8.9-1_all.deb ​ # 4. 启动服务 sudo systemctl start rabbitmq-server ​ # 5. 启用管理插件 sudo rabbitmq-plugins enable rabbitmq_management ​ # 6. 创建用户 sudo rabbitmqctl add_user admin password sudo rabbitmqctl set_user_tags admin administrator sudo rabbitmqctl set_permissions -p / admin .* .* .* 六、rabbitmq相关角色 1. 生产者 (Publisher/Producer) 职责创建并发送消息到 RabbitMQ 服务器 特点 不直接将消息发送到队列而是发送到交换器 (Exchange) 可以设置消息属性如持久化、优先级等 通常不知道消息最终会被哪些消费者接收 2. 消费者 (Consumer) 职责接收并处理来自队列的消息 特点 可以订阅一个或多个队列 可以手动或自动确认消息 (ack/nack) 可以设置 QoS服务质量控制预取数量 3. 代理服务器 (Broker) 职责RabbitMQ 服务本身负责接收、路由和传递消息 组成 Exchange交换器 Queue队列 Binding绑定 4. 交换器 (Exchange) 类型 类型路由规则典型用途Direct精确匹配 Routing Key点对点精确路由Fanout忽略 Routing Key广播到所有绑定队列广播通知Topic模糊匹配 Routing Key支持通配符多条件路由Headers根据消息头属性匹配复杂路由条件 特性 接收生产者发送的消息 根据类型和绑定规则将消息路由到队列 可以持久化在服务器重启后仍然存在 5. 队列 (Queue) 特性 消息存储的缓冲区 可以有多个消费者竞争消费模式 可配置属性 持久化Durable 自动删除Auto-delete 排他性Exclusive 消息 TTL存活时间 最大长度等 6. 绑定 (Binding) 作用连接 Exchange 和 Queue 的规则 组成要素 Exchange 名称 Queue 名称 Routing Key或用于 Headers Exchange 的匹配参数 7. 通道 (Channel) 特点 在 TCP 连接上建立的虚拟连接 轻量级减少 TCP 连接开销 每个 Channel 有独立 ID 建议每个线程使用独立的 Channel 8. 虚拟主机 (Virtual Host) 作用提供逻辑隔离环境 特点 类似于命名空间 每个 vhost 有独立的 Exchange、Queue 和绑定 需要单独配置权限 默认 vhost 为 / 9. 管理员角色 (Administrator) 权限 管理用户权限 创建/删除 vhost 查看所有资源 通常通过 rabbitmqctl 工具或管理界面操作 10. 插件系统 (Plugins) 常见插件 rabbitmq_management提供 Web 管理界面 rabbitmq_shovel跨集群消息转移 rabbitmq_federation分布式部署支持 rabbitmq_delayed_message_exchange延迟消息 角色交互示意图 ------------       ---------       -------       -------- | Publisher | ---- | Exchange| | Queue | ---- | Consumer| ------------       ---------       -------       --------(Binding) 七、rabbitmq内部组件 1、ConnectionFactory连接管理器应用程序与Rabbit之间建立连接的管理器程序代码中使用。 2、Channel信道消息推送使用的通道。 3、Exchange交换器用于接受、分配消息。 4、Queue队列用于存储生产者的消息。 5、RoutingKey路由键用于把生成者的数据分配到交换器上。 6、BindingKey绑定键用于把交换器的消息绑定到队列上。 八、生产者发送消息的过程 一、建立连接阶段 TCP连接建立 生产者应用通过AMQP客户端库发起TCP连接 默认端口5672带管理插件时为5672/15672 三次握手完成后建立物理连接 ConnectionFactory factory new ConnectionFactory(); factory.setHost(rabbitmq-host); factory.setPort(5672); Connection connection factory.newConnection(); 认证与vhost选择 发送START/START-OK协议帧进行认证 选择虚拟主机vhost默认/ 认证失败会收到CONNECTION-CLOSE帧 二、通道创建阶段 通道Channel初始化 在TCP连接上创建虚拟通道Channel 每个Channel有唯一ID从1开始递增 通道参数协商 Frame Max Size默认128KB Channel Max默认2047 Channel channel connection.createChannel(); 交换器声明可选 检查目标Exchange是否存在 不存在时根据参数自动创建 关键参数 typeexchange类型direct/fanout/topic/headers durable是否持久化 autoDelete无绑定时是否自动删除 channel.exchangeDeclare(order.exchange, direct, true); 三、消息发布阶段 消息构造 组成结构 {body: 消息内容二进制,properties: {delivery_mode: 2, # 1-非持久化 2-持久化priority: 0,       # 0-9优先级headers: {},       # 自定义头timestamp: 1620000000} } 发布消息到Exchange 通过Basic.Publish命令发送 关键参数 exchange目标交换器名称 routingKey路由键 mandatory是否触发Return回调 immediate已废弃参数 channel.basicPublish(order.exchange, order.create, MessageProperties.PERSISTENT_TEXT_PLAIN,messageBodyBytes ); 四、消息路由阶段 Exchange路由决策 根据Exchange类型处理 Direct精确匹配routingKey Fanout忽略routingKey广播到所有绑定队列 Topic通配符匹配*匹配一个词#匹配零或多个词 Headers匹配header键值对 队列投递 成功匹配消息进入队列内存缓冲区 无匹配时处理 设置了alternate-exchange转到备用交换器 未设置备用交换器且mandatorytrue触发Return回调 否则丢弃消息 五、确认阶段 Confirm模式可选 开启方式 channel.confirmSelect(); // 开启Confirm模式 确认机制 单条确认waitForConfirms() 批量确认waitForConfirmsOrDie() 异步回调 channel.addConfirmListener((sequenceNumber, multiple) - {// 处理ack }, (sequenceNumber, multiple) - {// 处理nack }); 事务模式可选 事务操作流程 channel.txSelect(); // 开启事务 try {channel.basicPublish(...);channel.txCommit(); // 提交事务 } catch (Exception e) {channel.txRollback(); // 回滚事务 } 六、资源释放阶段 通道关闭 发送Channel.Close命令 处理未确认消息 事务模式回滚未提交消息 Confirm模式未确认消息会触发nack 连接关闭 发送Connection.Close命令 服务端释放相关资源 客户端等待TCP连接正常关闭 九、消费者接收消息过程 一、连接建立阶段 TCP连接初始化 消费者客户端与RabbitMQ服务器建立TCP连接默认端口5672 完成AMQP协议握手 通道创建 在TCP连接上创建虚拟通道Channel 每个Channel独立维护消息流状态 关键参数设置 Channel channel connection.createChannel(); channel.basicQos(10); // 设置prefetch count 二、队列订阅阶段 队列声明与检查 检查目标队列是否存在 自动创建队列如果不存在且允许 channel.queueDeclare(order.queue, true, false, false, null); 队列参数解析 durable是否持久化 exclusive是否排他队列 autoDelete无消费者时是否自动删除 arguments扩展参数TTL、死信等 消费者注册 向Broker注册消费者标签consumer tag 选择消费模式 推模式Push API服务端主动推送 拉模式Basic.Get客户端主动拉取 三、消息接收阶段 消息推送机制 Broker按照QoS设置推送消息 while (unacked_count prefetch_count) and (queue.has_messages):message queue.next_message()send_to_consumer(message)unacked_count 1 消息帧结构 Basic.Deliver(consumer-tag,delivery-tag,redelivered,exchange,routing-key ) Message Body 消息处理流程 消费者接收消息后的处理步骤 反序列化消息体 验证消息完整性 执行业务逻辑 发送ack/nack 处理异常情况 四、确认与反馈阶段 消息确认机制 自动确认autoAcktrue 消息发出即视为成功 高风险消息可能处理失败但已确认 手动确认autoAckfalse // 成功处理 channel.basicAck(deliveryTag, false); // 处理失败requeuetrue重新入队 channel.basicNack(deliveryTag, false, true); 关键参数 deliveryTag消息唯一标识 multiple是否批量操作 requeue是否重新入队 拒绝消息处理 三种拒绝方式对比 方法是否批量是否重入队列适用场景basicReject否可配置单条消息处理失败basicNack是可配置批量消息处理异常basicRecover-是重新投递未ack消息 五、流量控制机制 QoS预取设置 作用限制未确认消息数量 全局 vs 通道级 // 单通道限制 channel.basicQos(10); // 全局限制所有通道总和 channel.basicQos(10, true); 最佳实践值 高吞吐场景100-300 高延迟任务5-10 流控Flow Control 当消费者处理能力不足时 Broker暂停发送新消息 触发Channel.Flow命令 消费者处理积压后恢复流动 六、异常处理阶段 连接中断处理 自动恢复机制 factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(5000); 恢复过程 重建TCP连接 恢复所有Channel 重新注册消费者 恢复未ack消息根据redelivered标记 死信处理 触发条件 消息被拒绝且requeuefalse 消息TTL过期 队列达到长度限制 死信队列配置 MapString, Object args new HashMap(); args.put(x-dead-letter-exchange, dlx.exchange); channel.queueDeclare(order.queue, true, false, false, args); 消费者最佳实践 幂等性设计 // 使用消息ID实现幂等 if (processedMessageIds.contains(messageId)) {channel.basicAck(tag, false);return; } 批量确认优化 // 每处理100条消息批量确认一次 if (messageCount % 100 0) {channel.basicAck(lastTag, true); } 死信监控 // 监听死信队列 channel.basicConsume(dlx.queue, false, (tag, msg) - {log.error(死信消息: {}, msg.getBody());channel.basicAck(tag, false); }); 消费者标签管理 // 优雅关闭消费者 void shutdown() {channel.basicCancel(consumerTag);// 等待处理中的消息完成while (inProgressCount 0) {Thread.sleep(100);} } 十、springboot项目中如何使用mq 十一、如何保障消息不丢失 1、发送阶段发送阶段保障消息到达交换机 事务机制|confirm确认机制 2、存储阶段持久化机制 交换机持久化、队列的持久化、消息内容的持久化 3、消费阶段消息的确认机制 自动ack|手动ack 接收方消息确认机制 自动ack|手动ack spring:rabbitmq:host: 1.94.230.82port: 5672username: adminpassword: 123456virtual-host: /yan3listener:simple:acknowledge-mode: manualdirect:acknowledge-mode: manual package com.hl.rabbitmq01.web; ​ import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; ​ import java.io.IOException; ​ RestController RequestMapping(/c) public class ConsumerController { ​RabbitListener(queues {topicQueue01})public void receive(Message message, Channel channel) throws IOException {String msg new String(message.getBody());System.out.println(msg);//业务逻辑 比如传入订单id根据订单id减少库存、支付等// 如果操作成功确认消息从队列移除如果操作失败手动拒绝消息if(msg.length() 5){//确认消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}else{//拒绝消息 not ack// 第三个参数requeue重回队列。如果设置为true则消息重新回到queuebroker会重新发送该消息给消费端channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); //           channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);} ​ ​} } 消息的持久化机制 交换机的持久化 队列的持久化 消息内容的持久化 package com.hl.rabbitmq01.direct; ​ import com.hl.rabbitmq01.util.MQUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; ​ import java.io.IOException; import java.util.concurrent.TimeoutException; ​ /* 生产者 javaSE方式简单测试 发布订阅-------direct模型 生产者----消息队列----消费者*/ public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1、创建连接Connection connection MQUtil.getConnection();//2、基于连接创建信道Channel channel connection.createChannel();//3、基于信道创建队列/*参数1. queue队列名称如果没有一个名字叫simpleQueue01的队列则会创建该队列如果有则不会创建2. durable:是否持久化当mq重启之后消息还在3. exclusive* 是否独占。只能有一个消费者监听这队列4。当Connection关闭时是否删除队列autoDelete:是否自动删除。当没有Consumer时自动删除掉5. arguments参数。*/channel.queueDeclare(directQueue01, true, false, false, null);channel.queueDeclare(directQueue02, false, false, false, null);/*声明交换机参数1交换机名称参数2交换机类型*/channel.exchangeDeclare(directExchange01, BuiltinExchangeType.DIRECT,true);/*绑定交换机和队列参数1队列名参数2交换机名称参数3路由key 广播模型 不支持路由key */channel.queueBind(directQueue01,directExchange01,error);channel.queueBind(directQueue02,directExchange01,error);channel.queueBind(directQueue02,directExchange01,info);channel.queueBind(directQueue02,directExchange01,trace);//发送消息到消息队列/*参数1. exchange交换机名称。简单模式下交换机会使用默认的 2. routingKey路由名称简单模式下路由名称使用消息队列名称3. props配置信息4. body发送消息数据*/ ​channel.basicPublish(directExchange01,user, MessageProperties.PERSISTENT_TEXT_PLAIN,(Hello World ).getBytes()); ​ ​//4、关闭信道断开连接channel.close();connection.close();} } package com.hl.rabbitmq01.web; ​ import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; ​ import java.io.IOException; import java.nio.charset.StandardCharsets; ​ RestController RequestMapping(/p) public class ProducerController {Autowiredprivate AmqpTemplate amqpTemplate;Autowiredprivate RabbitTemplate rabbitTemplate; ​ ​RequestMapping(/send)public void send(RequestParam(defaultValue user) String key,RequestParam(defaultValue hello) String msg) throws IOException {//amqpTemplate.convertAndSend(topicExchange, key, msg); //       rabbitTemplate.convertAndSend(topicExchange,key,msg);Channel channel rabbitTemplate.getConnectionFactory().createConnection().createChannel(false); //false 非事务模式运行 无需手动提交channel.basicPublish(topicExchange, key,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());} } ​ /* 创建交换机*/ Bean public TopicExchange topicExchange(){return ExchangeBuilder.topicExchange(topicExchange).durable(true)  //是否支持持久化机制.build(); } /* 创建队列*/ Bean public Queue queue(){return QueueBuilder.durable(topicQueue01).build(); } 发送方的消息确认机制 1、事务机制 消耗资源 RabbitMQ中与事务有关的主要有三个方法 txSelect() 开始事务 txCommit() 提交事务 txRollback() 回滚事务 txSelect主要用于将当前channel设置成transaction模式txCommit用于提交事务txRollback用于回滚事务。 当我们使用txSelect提交开始事务之后我们就可以发布消息给Broke代理服务器如果txCommit提交成功了则消息一定到达了Broke了如果在txCommit执行之前Broker出现异常崩溃或者由于其他原因抛出异常这个时候我们便可以捕获异常通过txRollback方法进行回滚事务了。 示例 RestController public class RabbitMQController { ​Autowiredprivate RabbitTemplate rabbitTemplate; ​RequestMapping(/send)public String sendMessage(String message){rabbitTemplate.setChannelTransacted(true); //开启事务操作rabbitTemplate.execute(channel - {try {channel.txSelect();//开启事务 ​channel.basicPublish(Fanout_Exchange,,null,message.getBytes()); ​int i 5/0; ​channel.txCommit();//没有问题提交事务}catch (Exception e){e.printStackTrace();channel.txRollback();//有问题回滚事务} ​return null;}); ​return success;} ​ } 消费者没有任何变化。 通过测试会发现发送消息时只要Broker出现异常崩溃或者由于其他原因抛出异常就会捕获异常通过txRollback方法进行回滚事务了则消息不会发送消费者就获取不到消息。 2、confirm确认机制 推荐 同步通知 channel.confirmSelect(); //开始confirm操作 ​ channel.basicPublish(Fanout_Exchange,,null,message.getBytes()); ​ if (channel.waitForConfirms()){System.out.println(发送成功); }else{//进行消息重发System.out.println(消息发送失败进行消息重发); } 异步通知 channel.confirmSelect(); ​ channel.addConfirmListener(new ConfirmListener() {//消息正确到达broker,就会发送一条ack消息Overridepublic void handleAck(long l, boolean b) throws IOException {System.out.println(发送消息成功);} ​//RabbitMQ因为自身内部错误导致消息丢失就会发送一条nack消息Overridepublic void handleNack(long l, boolean b) throws IOException {System.out.println(发送消息失败,重新发送消息);} }); ​ channel.basicPublish(Fanout_Exchange,,null,message.getBytes()); ​ 十二、死信交换机和死信队列 在实际开发项目时在较为重要的业务场景中要确保未被消费的消息不被丢弃例如订单业务那为了保证消息数据的不丢失可以使用RabbitMQ的死信队列机制当消息消费发生异常时将消息投入到死信队列中进行处理。 死信队列RabbitMQ中并不是直接声明一个公共的死信队列然后死信消息就会跑到死信队列中。而是为每个需要使用死信的消息队列配置一个死信交换机当消息成为死信后可以被重新发送到死信交换机然后再发送给使用死信的消息队列。 死信交换机英文缩写DLX 。Dead Letter Exchange死信交换机死信交换机其实就是普通的交换机通过给队列设置参数 x-dead-letter-exchange 和x-dead-letter-routing-key来指向死信交换机 RabbitMQ规定消息符合以下某种情况时将会成为死信 队列消息长度到达限制队列消息个数限制 消费者拒接消费消息basicNack/basicReject,并且不把消息重新放入原目标队列,requeuefalse 原队列存在消息过期设置消息到达超时时间未被消费 死信消息会被RabbitMQ特殊处理如果配置了死信队列则消息会被丢到死信队列中如果没有配置死信队列则消息会被丢弃。 MapString,Object map new HashMap();map.put(x-dead-letter-exchange,deadExchange);//当前队列和死信交换机绑定map.put(x-dead-letter-routing-key,user.#);//当前队列和死信交换机绑定的路由规则 //       map.put(x-max-length,2);//队列长度map.put(x-message-ttl,10000);//队列消息过期时间时间ms ​ //       return QueueBuilder.durable(topicQueue01).build();return QueueBuilder.durable(topicQueue).withArguments(map).build(); 十三、延迟队列简介 延迟队列即消息进入队列后不会立即被消费只有到达指定时间后才会被消费。 RabbitMQ中没有延迟队列但是可以用ttltime to live死信队列方式和延迟插件两种方式来实现 ttl死信队列代码在讲死信队列时已经实现这个不再阐述。 延迟插件 人们一直在寻找用RabbitMQ实现延迟消息的传递方法到目前为止公认的解决方案是混合使用TTL和DLX。rabbitmq_delayed_message_exchange插件就是基于此来实现的RabbitMQ延迟消息插件新增了一种新的交换器类型消息通过这种交换器路由就可以实现延迟发送。 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases 十四、RabbitMQ消息重复消费 RabbitMQ消息重复消费问题_rabbitmq重复消费的问题解决-CSDN博客 业务背景 消息队列在数据传输的过程中为了保证消息传递的可靠性一般会对消息采用ack确认机制如果消息传递失败消息队列会进行重试此时便可能存在消息重复消费的问题。 比如用户到银行取钱后会收到扣款通知短信如果用户收到多条扣款信息通知则会有困惑。 解决方法一send if not exist 首先将 RabbitMQ 的消息自动确认机制改为手动确认然后每当有一条消息消费成功了就把该消息的唯一ID记录在Redis 上然后每次发送消息时都先去 Redis 上查看是否有该消息的 ID如果有表示该消息已经消费过了不再处理否则再去处理。 2.1 利用数据库唯一约束实现幂等 解决方法二insert if not exist 可以通过给消息的某一些属性设置唯一约束比如增加唯一uuid添加的时候查询是否存对应的uuid存在不操作不存在则添加那样对于相同的uuid只会存在一条数据 解决方法三sql的乐观锁 比如给用户发送短信变成如果该用户未发送过短信则给用户发送短信此时的操作则是幂等性操作。但在实际上对于一个问题如何获取前置条件往往比较复杂此时可以通过设置版本号version每修改一次则版本号1在更新时则通过判断两个数据的版本号是否一致。 十五、RabbitMQ消息积压 RabbitMq——消息积压分析和解决思路_rabbitmq消息积压-CSDN博客 消息积压产生的原因 正常而言一般的消息从消息产生到消息消费需要经过以下几种阶段。 以Direct模式为例 消息由生产者产生比如新订单的创建等经过交换机将消息发送至指定的队列中然后提供给对应的消费者进行消费。 在这个链路中存在消息积压的原因大致分为以下几种 1、消费者宕机导致消息队列中的消息无法及时被消费出现积压。 2、消费者没有宕机但因为本身逻辑处理数据耗时导致消费者消费能力不足引起队列消息积压。 3、消息生产方单位时间内产生消息过多比如“双11大促活动”导致消费者处理不过来。 消息积压问题解决 针对上面消息积压问题的出现大致进行了分析那么根据分析则能制定相关的应对方法。如下所示 1、大促活动等导致生产者流量过大引起积压问题。 提前增加服务器的数量增加消费者数目提升消费者针对指定队列消息处理的效率。 2、上线更多的消费者处理消息队列中的数据。(和1中的大致类似) 3、如果成本有限则可以专门针对这个队列编写一个另类的消费者。 当前另类消费者不进行复杂逻辑处理只将消息从队列中取出存放至数据库中然后basicAck反馈给消息队列。 十六、消息入库消息补偿 如果RabbitMQ收到消息还没来得及将消息持久化到硬盘时RabbitMQ挂了这样消息还是丢失了或者RabbitMQ在发送确认消息给生产端的过程中由于网络故障而导致生产端没有收到确认消息这样生产端就不知道RabbitMQ到底有没有收到消息这样也不太好进行处理。所以为了避免RabbitMQ持久化失败而导致数据丢失我们自己也要做一些消息补偿机制以应对一些极端情况。 在使用消息队列Message Queue时消息的补偿机制是一种处理消息处理失败或异常情况的方法。当消息消费者无法成功处理消息时补偿机制允许系统将消息重新发送或执行其他操作以确保消息的可靠传递和处理。 补偿机制通常涉及以下几个方面 重试机制当消息处理失败时补偿机制会尝试重新发送消息给消费者以便重新处理。重试间隔和重试次数可以根据具体情况进行配置以避免重复投递导致的消息处理失败。 延时队列补偿机制还可以使用延时队列来处理无法立即处理的消息。当某个消息处理失败时可以将该消息放入到延时队列中在一定的延时之后再次尝试发送给消费者进行处理。 死信队列当消息无法被成功处理时可以将这些无法处理的消息发送到死信队列Dead Letter Queue。死信队列通常用于存储无法被消费者处理的消息以便后续进行排查和处理。 可视化监控和报警补偿机制还可以包括对消息队列的监控和报警功能以便及时发现和处理异常情况。通过可视化监控工具可以实时查看消息队列的状态和处理情况及时发现问题并采取相应的补救措施。 补偿机制的设计和实现密切依赖于具体的消息中间件和使用场景不同的消息队列系统可能提供不同的补偿机制。因此在选择和使用消息队列时需要根据自身的需求和系统特点来选择适合的消息补偿机制。
http://www.zqtcl.cn/news/601663/

相关文章:

  • 为什么网站权重会掉房地产开发网站建设
  • 大连seo整站优化酷播wordpress视频插件
  • 好的网页设计网站学编程要什么学历
  • 做网站公司电话福建城乡建设网站查询
  • 郑州做网站哪个公司好做二手市场类型的网站名字
  • 网站建设与维护里面的个人简历选择网站建设公司好
  • 济南浩辰网站建设公司怎么样wordpress 3.8 问题
  • 柳州正规网站制作公司网站ww正能量
  • 网站seo优化工具网站推广策略方法
  • 企业网站建设知名wordpress 自定义php
  • 用php做的网站有哪些全能网站建设
  • 网站显示正在建设中wordpress 排行榜 页面
  • 手机管理网站网站打开速度优化
  • 做微网站需要什么做的比较好的美食网站有哪些
  • 五金商城网站建设注意wordpress虚拟空
  • 成都工程网站建设网站界面设计的优点
  • 网站建设里的知识找别人做公司网站第一步做什么
  • 婚纱摄影网站模板之家专业seo网站优化公司
  • 商丘市住房和城乡建设局网站广西网站建设timkee
  • php网站开发是做什么的网站策划总结
  • 站长工具seo推广秒收录WordPress注册插件中文
  • 目前个人网站做地最好是哪几家做汽配网站需要多少钱
  • php做网站多少钱网络营销推广方案3篇
  • 浙江坤宇建设有限公司 网站省直部门门户网站建设
  • 直播类网站怎么做上海市建设质量协会网站
  • 筑巢做网站怎么样网站设计接单
  • 会ps的如何做网站wordpress 仿虎嗅
  • 免费响应式网站建设嘉兴建企业网站
  • 织梦网站首页幻灯片不显示建设银行网站特色
  • php企业网站开发东莞网站建设时间