网站更改关键词,金融网站模版,网站怎么更新网页内容,建设银行网站首页打不开文章目录 介绍RabbitMQ的特点Rabbitmq术语消息发布接收流程 Docker部署管理界面说明Overview: 这个页面显示了RabbitMQ服务器的一般信息#xff0c;例如集群节点的名字、状态、运行时间等。Connections: 在这里#xff0c;可以查看、管理和关闭当前所有的TCP连接。Channels: … 文章目录 介绍RabbitMQ的特点Rabbitmq术语消息发布接收流程 Docker部署管理界面说明Overview: 这个页面显示了RabbitMQ服务器的一般信息例如集群节点的名字、状态、运行时间等。Connections: 在这里可以查看、管理和关闭当前所有的TCP连接。Channels: 这个页面展示了所有当前打开的通道以及它们的详细信息。外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传Exchanges: 可以在这里查看、创建和删除交换机。Queues: 这个页面展示了所有当前的队列以及它们的详细信息。Admin: 在这里可以查看系统中所有的操作用户。 延时队列插件下载安装rabbitmq.conf配置文件示例1.1 rabbitmq.conf1.2 advanced.config1.3 rabbitmq-env.conf Java配置Yml完整配置RabbitMQ的六种工作模式消费者RabbitListener注解下的配置内容1.simple简单模式点对点模式2.work工作模式(一对多)3.publish/subscribe发布订阅(共享资源)4.routing路由模式5.topic 主题模式(路由模式的一种)6.RPC (基于消息的远程过程调用) 延时队列、循环队列、兜底机制、定时任务1.延时队列使用TTL死信队列组合实现延迟队列的效果。使用RabbitMQ官方延迟插件实现延时队列效果。 2.循环队列3.兜底机制4.定时任务 介绍
RabbitMQ是由Erlang语言开发的AMQP的开源实现
AMQPAdvanced Message Queue高级消息队列协议。它是应用层协议的一个开放标准为面向消息的中间件设计基于此协议的客户端与消息中间件可传递消息并不受产品、开发语言等条件的限制。
RabbitMQ的特点
可靠性(Reliablity)使用了一些机制来保证可靠性比如持久化、传输确认、发布确认。灵活的路由(Flexible Routing)在消息进入队列之前通过Exchange来路由消息。对于典型的路由功能Rabbit已经提供了一些内置的Exchange来实现。针对更复杂的路由功能可以将多个Exchange绑定在一起也通过插件机制实现自己的Exchange。消息集群(Clustering)多个RabbitMQ服务器可以组成一个集群形成一个逻辑Broker。高可用(Highly Avaliable Queues)队列可以在集群中的机器上进行镜像使得在部分节点出问题的情况下队列仍然可用。多种协议(Multi-protocol)支持多种消息队列协议如STOMP、MQTT等。多种语言客户端(Many Clients)几乎支持所有常用语言比如Java、.NET、Ruby等。管理界面(Management UI)提供了易用的用户界面使得用户可以监控和管理消息Broker的许多方面。跟踪机制(Tracing)如果消息异常RabbitMQ提供了消息的跟踪机制使用者可以找出发生了什么。插件机制(Plugin System)提供了许多插件来从多方面进行扩展也可以编辑自己的插件
Rabbitmq术语 消费者订阅某个队列 生产者:创建消息然后发布到队列中(queue)最终将消息发送到监听的消费者。 Broker标识消息队列服务器实体. Virtual Host虚拟主机。标识一批交换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础必须在链接时指定RabbitMQ默认的vhost是 /。 Exchange交换机用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 Queue消息队列用来保存消息直到发送给消费者。它是消息的容器也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面等待消费者连接到这个队列将其取走。 Banding绑定用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则所以可以将交换机理解成一个由绑定构成的路由表。 Channel通道多路复用连接中的一条独立的双向数据流通道。通道是建立在真实的TCP连接内的虚拟链接AMQP命令都是通过通道发出去的不管是发布消息、订阅队列还是接收消息这些动作都是通过通道完成。因为对于操作系统来说建立和销毁TCP都是非常昂贵的开销所以引入了通道的概念以复用一条TCP连接。 Connection网络连接比如一个TCP连接。 Publisher消息的生产者也是一个向交换器发布消息的客户端应用程序。 Consumer消息的消费者表示一个从一个消息队列中取得消息的客户端应用程序。 Message消息消息是不具名的它是由消息头和消息体组成。消息体是不透明的而消息头则是由一系列的可选属性组成这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息可能需要持久性存储[消息的路由模式])等。
消息发布接收流程
1.发送消息
生产者和Broker建立TCP连接。生产者和Broker建立通道。生产者通过通道消息发送给Broker由Exchange将消息进行转发。Exchange将消息转发到指定的Queue队列
2.接收消息
消费者和Broker建立TCP连接 。消费者和Broker建立通道消费者监听指定的Queue队列当有消息到达Queue时Broker默认将消息推送给消费者。消费者接收到消息。
Docker部署
查询rabbitmq最新版本
docker search rabbitmq #查询镜像 已经集成了erlang无需单独安装erlang
docker pull rabbitmq #拉取镜像 最新版或指定版本 docker pull rabbitmq:3.13-management 自带管理界面
docker images # 查看拉取的镜像
#启动容器 指定管理界面登陆账号和密码
# 15672 管理界面端口
# 5672 amqp协议端口程序连接端口
# -v /mnt/data/rabbitmq/conf:/etc/rabbitmq 配置文件目录
# -v /mnt/data/rabbitmq/data:/var/lib/rabbitmq 数据目录
# -v /mnt/data/rabbitmq/log:/var/log/rabbitmq 日志目录
# -e RABBITMQ_DEFAULT_USERtlmroot 管理界面登陆账号
# -e RABBITMQ_DEFAULT_PASS123456 管理界面登陆密码
# 最好限制容器内存 --memory 2g
docker run -d --hostname rabbitmq --name rabbitmq --restartalways -v /mnt/data/rabbitmq/conf:/etc/rabbitmq -v /mnt/data/rabbitmq/data:/var/lib/rabbitmq -v /mnt/data/rabbitmq/log:/var/log/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USERtlmroot -e RABBITMQ_DEFAULT_PASS123456 rabbitmq:latest
# docker ps 如果容器启动失败需要提高日志挂载目录的访问权限后重启服务
chmod 777 /mnt/data/rabbitmq/log
# 进入容器内部安装管理界面插件
docker exec -it rabbitmq /bin/bash
# 容器内部创建管理界面插件 安装完成即可访问 服务器IP加15672如无法访问 关闭防火墙 systemctl stop firewalld
rabbitmq-plugins enable rabbitmq_management
# 容器内部启用所有功能标志,也可以在管理界面操作(Admin/Feature Flags)
enable all feature flags
#至此创建完成 服务器需要开放15672和5672端口管理界面说明 Overview: 这个页面显示了RabbitMQ服务器的一般信息例如集群节点的名字、状态、运行时间等。 Totals 消息数队列消息、连接数、通道数、交换机数、队列数、消费者数
Nodes节点信息 进程数、磁盘数据、运行时间、等
Churn statistics: 流失率统计最后一分钟连接操作、通道操作、队列操作
Ports and contexts: 端口信息及网络环境信息
Export definitions: 导出配置
Import definitions导入配置Connections: 在这里可以查看、管理和关闭当前所有的TCP连接。 Virtual host 所属的虚拟主机。Name 名称。User name 使用的用户名。State 当前的状态running运行中idle空闲。SSL/TLS 是否使用ssl进行连接。Protocol 使用的协议。Channels 创建的channel的总数。From client 每秒发出的数据包。To client 每秒收到的数据包。Channels: 这个页面展示了所有当前打开的通道以及它们的详细信息。外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传 channel名称。
Virtual host所属的虚拟主机。
User name使用的用户名。
Mode渠道保证模式。 可以是以下之一或者不是C: confirm。Ttransactional(事务)。
State 当前的状态running运行中idle空闲。
Unconfirmed待confirm的消息总数。
Prefetch设置的prefetch的个数。
Unacker待ack的消息总数。
publish生产端 pub消息的速率。
confirm生产端确认消息的速率。
deliver/get消费端获取消息的速率。
ack消费端 ack消息的速率Exchanges: 可以在这里查看、创建和删除交换机。 Name名称。Typeexchange type具体的type可以查看RabbitMq系列之一基础概念。Features持久化D:持久化 I内部 AD自动删除Message rate in消息输入速率。Message rate out消息输出速率Add a new exchange:Virtual host:属于哪个Virtual host我这里只有一个所以不显示Name名字同一个Virtual host里面的Name不能重复。Durability 是否持久化Durable持久化。Transient不持久化。Auto delete当最后一个绑定队列或者exchange被unbind之后该exchange自动被删除。Internal 是否是内部专用exchange是的话就意味着我们不能往该exchange里面发消息。Arguments 参数是AMQP协议留给AMQP实现做扩展使用的Queues: 这个页面展示了所有当前的队列以及它们的详细信息。 Virtual host 所属的虚拟主机。
Name 名称。
Features 功能。参数参考上述交换机页面
State 当前的状态running运行中idle空闲。
Ready 待消费的消息总数。
Unacked 待应答的消息总数。
Total 总数 ReadyUnacked。
incoming 消息进入的速率。
deliver/get 消息获取的速率。
ack 消息应答的速率。
Add a new queue:Virtual host:属于哪个Virtual hostName名字同一个Virtual host里面的Name不能重复。Durability 是否持久化Durable持久化。Transient不持久化。Auto delete当最后一个绑定队列或者exchange被unbind之后该 queue 自动被删除。Arguments 参数是AMQP协议留给AMQP实现做扩展使用的Admin: 在这里可以查看系统中所有的操作用户。 Name 名称。
Tags 角色标签只能选取一个。
Can access virtual hosts 允许进入的vhost。
Has password 设置了密码。Virtual Host虚拟主机
虚拟主机vhost提供逻辑分组和资源分离。每一个vhost本质上是一个mini版的RabbitMQ服务器拥有自己的connection、exchange、queue、binding等拥有自己的权限。vhost之于RabbitMQ就像虚拟机于物理机一样他们通过在各个实例间提供逻辑上分离允许为不同的应用程序安全保密的运行数据。Feature Flags功能标志开关
Deprecated Features已废特性
Policies策略配置
策略分为“用户策略”和“系统策略”
策略使用的是正则表达匹配规则按名称匹配一个或多个队列并将其定义的一些规则参数到匹配队列中。换句话说可以使用策略一次为多个队列配置参数。策略可以理解为给“队列”和“分发器”设置额外的“Arguments”参数。每个“分发器”和“队列”只能生效一个“策略”并且是是立即生效的。参数Apply to指定策略是只匹配队列、还是只匹配交换或两则两者都匹配。Priority表示的是策略的优先级、值越大优先级越高。Definition才是真正的规则。有四大类分别是HA高可用性、federation联合、Queues(队列)、Exchanges备用分发器HA(高可用性)表示将队列怎么镜像到节点的策略。ha-mode选项有三个分别是“all“表示同步到所有节点),“exactly”,“nodes”。exactly和nodes需要结合ha-params才能决定同步策略ha-params为数值、表示个数ha-sync-mode:手动manual/自动automatic同步Limits 可以设置最大连接数
Cluster 集群 更改集群名称
延时队列插件下载安装
延时插件链接地址3.13.0下载版本和rabbitmq版本要一致 我安装的是 3.13.0
# 下载插件到/home/目录,
curl -JL https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez -o /home/rabbitmq_delayed_message_exchange-3.13.0.ez
# 可省略进入容器查询插件目录三方插件需要放在这里 (ez结尾的文件/opt/rabbitmq/plugins)
rabbitmq-plugins directories -s
# 容器/opt/rabbitmq/plugins 为插件目录 延时队列插件需要复制到这里
docker cp /home/rabbitmq_delayed_message_exchange-3.13.0.ez rabbitmq:/opt/rabbitmq/plugins
# 进入容器内部查询插件
docker exec -it rabbitmq /bin/bash
# rabbitmq-plugins list 如下图
rabbitmq-plugins list
# 安装插件命令同安装管理界面命令 rabbitmq-plugins enable 《plugin_name》
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 安装完成后 界面Exchanges交换机新增的时候就会出现x-delayed-messagerabbitmq.conf配置文件示例
容器运行后默认没有配置文件自带的配置足够使用自行创建放在主机/mnt/data/rabbitmq/conf/目录或是放在容器/etc/rabbitmq目录创建容器时已映射
容器内部查询有效配置
rabbitmqctl environment
RabbitMQ 常用的三种自定义服务器的通用方法配置文件 rabbitmq.conf环境变量文件 rabbitmq-env.conf补充配置文件 advanced.configrabbitmq.conf和rabbitmq-env.conf的位置在二进制安装中路径是在 安装目录下的/etc/rabbitmq/rpm 安装 /etc/rabbitmq/如果rabbitmq.conf和rabbitmq-env.conf 的两个文件不存在那么我们可以创建该文件然后我们可以通过环境变量
指定该文件的位置。补充 rabbitmqctl rabbitmqctl 是管理虚拟主机和用户权限的工具rabbitmq-plugins 是管理插件的工具1.1 rabbitmq.conf
属性描述默认值listeners要监听 AMQP 0-9-1 and AMQP 1.0 的端口listeners.tcp.default 5672num_acceptors.tcp接受tcp连接的erlang 进程数num_acceptors.tcp 10handshake_timeoutAMQP 0-9-1 超时时间也就是最大的连接时间单位毫秒handshake_timeout 10000listeners.ssl启用TLS的协议默认值为nonenum_acceptors.ssl接受基于TLS协议的连接的erlang 进程数num_acceptors.ssl 10ssl_optionsTLS 配置ssl_options nonessl_handshake_timeoutTLS 连接超时时间 单位为毫秒ssl_handshake_timeout 5000vm_memory_high_watermark触发流量控制的内存阈值可以为相对值(0.5),或者绝对值 vm_memory_high_watermark.relative 0.6 ,vm_memory_high_watermark.absolute 2GB默认vm_memory_high_watermark.relative 0.4vm_memory_calculation_strategy内存使用报告策略assigned使用Erlang内存分配器统计信息 rss使用操作系统RSS内存报告。这使用特定于操作系统的方法并可能启动短期子进程。legacy使用遗留内存报告运行时认为将使用多少内存。这种策略相当不准确。erlang 与legacy一样 是为了向后兼容vm_memory_calculation_strategy allocatedvm_memory_high_watermark_paging_ratio当内存的使用达到了50%后,队列开始将消息分页到磁盘vm_memory_high_watermark_paging_ratio 0.5total_memory_available_override_value该参数用于指定系统的可用内存总量一般不使用适用于在容器等一些获取内存实际值不精确的环境默认未设置disk_free_limitRabbitmq存储数据的可用空间限制当低于该值的时候将触发流量限制设置可参考vm_memory_high_watermark参数disk_free_limit.absolute 50MBlog.file.level控制记录日志的等级有info,error,warning,debuglog.file.level infochannel_max最大通道数但不包含协议中使用的特殊通道号0设置为0表示无限制不建议使用该值容易出现channel泄漏channel_max 2047channel_operation_timeout通道操作超时单位为毫秒channel_operation_timeout 15000heartbeat表示连接参数协商期间服务器建议的心跳超时的值。如果两端都设置为0则禁用心跳,不建议禁用heartbeat 60default_vhostrabbitmq安装后启动创建的虚拟主机default_vhost /default_user默认创建的用户名default_user guestdefault_pass默认用户的密码default_pass guestdefault_user_tags默认用户的标签default_user_tags.administrator truedefault_permissions在创建默认用户是分配给默认用户的权限default_permissions.configure .* default_permissions.read .* default_permissions.write .*loopback_users允许通过回环地址连接到rabbitmq的用户列表,如果要允许guest用户远程连接(不安全)请将该值设置为none,如果要将一个用户设置为仅localhost连接的话配置loopback_users.username true(username要替换成用户名)loopback_users.guest true(默认为只能本地连接)cluster_formation.classic_config.nodes设置集群节点cluster_formation.classic_config.nodes.1 rabbithostname1cluster_formation.classic_config.nodes.2 rabbithostname2默认为空未设置collect_statistics统计收集模式none 不发出统计信息事件coarse每个队列连接都发送统计一次,fine每发一条消息的统计数据collect_statistics nonecollect_statistics_interval统计信息收集间隔以毫秒为单位collect_statistics_interval 5000delegate_count用于集群内通信的委托进程数。在多核的服务器上我们可以增加此值delegate_count 16tcp_listen_options默认的套接字选项tcp_listen_options.backlog 128 …hipe_compile设置为true以使用HiPE预编译RabbitMQ的部分HiPE是Erlang的即时编译器,启用HiPE可以提高吞吐量两位数但启动时会延迟几分钟。Erlang运行时必须包含HiPE支持。如果不是启用此选项将不起作用。HiPE在某些平台上根本不可用尤其是Windows。hipe_compile falsecluster_keepalive_interval节点应该多长时间向其他节点发送keepalive消息(以毫秒为单位),keepalive的消息丢失不会被视为关闭cluster_keepalive_interval 10000queue_index_embed_msgs_below消息的字节大小,低于该大小消息将直接嵌入队列索引中 bytesqueue_index_embed_msgs_below 4096mnesia_table_loading_retry_timeout等待集群中Mnesia表可用的超时时间单位毫秒mnesia_table_loading_retry_timeout 30000mnesia_table_loading_retry_limit集群启动时等待Mnesia表的重试次数不适用于Mnesia升级或节点删除。mnesia_table_loading_retry_limit 10mirroring_sync_batch_size要在队列镜像之间同步的消息的批处理大小mirroring_sync_batch_size 4096queue_master_locator队列主节点的策略有三大策略 min-mastersclient-localrandomqueue_master_locator client-localproxy_protocol如果设置为true ,则连接需要通过反向代理连接不能直连接proxy_protocol falsemanagement.listener.portrabbitmq web管理界面使用的端口management.listener.port 15672
1.2 advanced.config
某些配置设置不可用或难以使用sysctl格式进行配置。因此可以使用Erlang术语格式的其他配置文件advanced.config 它将与rabbitmq.conf 文件中提供的配置合并。
属性描述默认值msg_store_index_module设置队列索引使用的模块{rabbit[ {msg_store_index_modulerabbit_msg_store_ets_index} ]}backing_queue_module队列内容的实现模块。{rabbit[ {backing_queue_modulerabbit_variable_queue} ]}msg_store_file_size_limit消息储存的文件大小,现有的节点更改是危险的可能导致数据丢失默认值16777216trace_vhosts内部的tracer使用不建议更改{rabbit[ {trace_vhosts[]} ]}msg_store_credit_disc_bound设置消息储存库给队列进程的积分,默认一个队列进程被赋予4000个消息积分{rabbit, [{msg_store_credit_disc_bound, {4000, 800}}]}queue_index_max_journal_entries队列的索引日志超过该阈值将刷新到磁盘{rabbit, [{queue_index_max_journal_entries, 32768}]}lazy_queue_explicit_gc_run_operation_threshold在内存压力下为延迟队列设置的值该值可以触发垃圾回收和减少内存使用降低该值会降低性能提高该值会导致更高的内存消耗{rabbit,[{lazy_queue_explicit_gc_run_operation_threshold, 1000}]}queue_explicit_gc_run_operation_threshold在内存压力下正常队列设置的值该值可以触发垃圾回收和减少内存使用降低该值会降低性能提高该值会导致更高的内存消耗{rabbit, [{queue_explicit_gc_run_operation_threshold, 1000}]}
1.3 rabbitmq-env.conf
通过rabbitmq-env.conf 来定义环境变量 RABBITMQ_NODENAME 指定节点名称
属性描述默认值RABBITMQ_NODE_IP_ADDRESS绑定的网络接口默认为空字符串表示绑定本机所有的网络接口RABBITMQ_NODE_PORT端口默认为5672RABBITMQ_DISTRIBUTION_BUFFER_SIZE节点之间通信连接的数据缓冲区大小默认为128000,该值建议不要使用低于64MBRABBITMQ_IO_THREAD_POOL_SIZE运行时用于io的线程数建议不要低于32linux默认为128 windows默认为64RABBITMQ_NODENAMErabbitmq节点名称集群中要注意节点名称唯一linux 默认节点名为 rabbit$hostnameRABBITMQ_CONFIG_FILErabbitmq 的配置文件路径注意不要加文件的后缀(.conf)默认 $RABBITMQ_HOME/etc/rabbitmq/rabbitmq(二进制安装) /etc/rabbitmq/rabbitmq(rpm 安装)RABBITMQ_ADVANCED_CONFIG_FILEadvanced.config文件路径默认 $RABBITMQ_HOME/etc/rabbitmq/advanced(二进制安装) /etc/rabbitmq/advanced(rpm 安装)RABBITMQ_CONF_ENV_FILE环境变量配置文件路径默认 $RABBITMQ_HOME/etc/rabbitmq/rabbitmq-env.conf(二进制安装) /etc/rabbitmq/rabbitmq-env.conf(rpm 安装)RABBITMQ_SERVER_CODE_PATH在使用HiPE 模块时需要使用默认为空RABBITMQ_LOGS指定日志文件位置默认为 $RABBITMQ_HOME/etc/var/log/rabbitmq/
RABBITMQ_DISTRIBUTION_BUFFER_SIZE 节点间通信缓冲区大小,默认值 128Mb,节点流量比较多的集群中可以提升该值建议该值不要低于64MB。
tcp 缓存区大小 下示例将AMQP 0-9-1连接的TCP缓冲区设置为192 KiB
tcp_listen_options.backlog 128
tcp_listen_options.nodelay true
tcp_listen_options.linger.on true
tcp_listen_options.linger.timeout 0
tcp_listen_options.sndbuf 196608
tcp_listen_options.recbuf 196608Java配置
!-- Maven依赖,Springboot默认集成--
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependencyYml完整配置
spring:rabbitmq:host: 127.0.0.1 #ipport: 5672 #端口username: tlmroot #账号password: 123456 #密码virtualHost: #链接的虚拟主机 切换不同环境 dev\test\prodaddresses: 127.0.0.1:5672 #多个以逗号分隔与host功能一样。requestedHeartbeat: 60 #指定心跳超时单位秒0为不指定默认60spublisherConfirms: true #发布确认机制是否启用publisherReturns: #发布返回是否启用connectionTimeout: #链接超时。单位ms。0表示无穷大不超时### ssl相关ssl:enabled: #是否支持sslkeyStore: #指定持有SSL certificate的key store的路径keyStoreType: #key store类型 默认PKCS12keyStorePassword: #指定访问key store的密码trustStore: #指定持有SSL certificates的Trust storetrustStoreType: #默认JKStrustStorePassword: #访问密码algorithm: #ssl使用的算法例如TLSv1.1verifyHostname: #是否开启hostname验证### cache相关cache:channel: size: #缓存中保持的channel数量checkoutTimeout: #当缓存数量被设置时从缓存中获取一个channel的超时时间单位毫秒如果为0则总是创建一个新channelconnection:mode: #连接工厂缓存模式CHANNEL 和 CONNECTIONsize: #缓存的连接数只有是CONNECTION模式时生效### listenerlistener:type: #两种类型SIMPLEDIRECT 默认simple## simple类型 simple: # 一对一concurrency: #最小消费者数量maxConcurrency: #最大的消费者数量transactionSize: #指定一个事务处理的消息数量最好是小于等于prefetch的数量missingQueuesFatal: #是否停止容器当容器中的队列不可用## 与direct相同配置部分autoStartup: #是否自动启动容器acknowledgeMode: #表示消息确认方式其有三种配置方式分别是none、manual和auto默认autoprefetch: #指定一个请求能处理多少个消息如果有事务的话必须大于等于transaction数量defaultRequeueRejected: #决定被拒绝的消息是否重新入队默认是true与参数acknowledge-mode有关系idleEventInterval: #container events发布频率单位ms##重试机制retry: stateless: #有无状态enabled: #是否开启maxAttempts: #最大重试次数,默认3initialInterval: #重试间隔multiplier: #对于上一次重试的乘数maxInterval: #最大重试时间间隔direct: # 一对多consumersPerQueue: #每个队列消费者数量missingQueuesFatal:#...其余配置看上方公共配置## template相关template:mandatory: #是否启用强制信息默认falsereceiveTimeout: #receive()接收方法超时时间replyTimeout: #sendAndReceive()超时时间exchange: #默认的交换机routingKey: #默认的路由defaultReceiveQueue: #默认的接收队列## retry重试相关retry: enabled: true #是否重试功能 默认falsemaxAttempts: 3 #最大重试次数 默认为3initialInterval: 1000ms #重试间隔时间 可以使用ms、s、m、h、dmultiplier: #重试乘数默认为1即每次重试间隔时间保持不变maxInterval: 10000ms #最大重试间隔时间 与乘数结合使用
配置文件
package com.tecloman.cloud.singleton.rabbitmq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** author Administrator*/
Configuration
Slf4j
public class RabbitConfig {Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {final RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory);// 序列化配置rabbitTemplate.setMessageConverter(jsonMessageConverter());rabbitTemplate.setMandatory(true);// 推送到server回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -log.info(ConfirmCallback correlationData:{},ack:{},cause:{},correlationData,ack,cause));// 消息返回给生产者, 路由不到队列时返回给发送者 先returnCallback,再 confirmCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) - {log.info(ReturnCallback message:{},replyCode:{},replyText:{},exchange:{},routingKey:{},message,replyCode,replyText,exchange,routingKey);});return rabbitTemplate;}Beanpublic Jackson2JsonMessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();}
}RabbitMQ的六种工作模式
消费者RabbitListener注解下的配置内容
Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
Retention(RetentionPolicy.RUNTIME)
MessageMapping
Documented
Repeatable(RabbitListeners.class)
public interface RabbitListener {String id() default ; // 用于为监听器指定一个唯一标识符,不指定则自动生成。String containerFactory() default ;// 指定要使用的消息监听容器工厂 bean 的名称。String[] queues() default {}; // 指定要监听的队列名称。可以是队列名称属性占位符键或表达式队列必须存在queues 属性与 bindings() 和 queuesToDeclare() 属性互斥不能同时使用Queue[] queuesToDeclare() default {}; // 用于声明要监听的队列可以通过 Queue 注解定义队列的属性。与 bindings() 和 queues() 属性互斥不能同时使用允许动态声明队列。boolean exclusive() default false; // 指定是否为独占模式即只有一个消费者可以消费该队列为true时要求并发数1。String priority() default ; // 指定消息的优先级越大优先级越高。默认为容器优先级可以为负数。String admin() default ; // 属性用于指定一个 RabbitAdmin bean 的引用。QueueBinding[] bindings() default {}; // 用于绑定队列和交换机以便监听指定的交换机中的消息。与 queues() 和 queuesToDeclare() 属性互斥不能同时使用。String group() default ; // 指定消费者所属的分组。可以用于实现分组消费确保同一组内的消费者共享消息。String returnExceptions() default ; // 定义一个异常处理策略用于处理消息发送失败时的异常情况。String errorHandler() default ; // 指定消息监听容器的错误处理器用于处理在消息处理过程中发生的错误。String concurrency() default ; // 指定消费者的并发数量表示同时处理消息的线程数或者并发消费者的数量。String autoStartup() default ; // 指定容器是否自动启动如果设置为 true则容器会在启动时自动开始侦听消息。String executor() default ; // 定义用于处理消息的执行器可以指定一个线程池来处理消息的消费逻辑。String ackMode() default ; // 指定消息确认模式用于控制消息的确认方式包括自动、手动、批量确认等。String replyPostProcessor() default ; // 定义一个后处理器用于在发送响应时对响应消息进行处理。String messageConverter() default ; // 指定消息转换器用于将消息从字节流转换为目标对象或者将目标对象转换为字节流。String replyContentType() default ; // 指定回复消息的内容类型。String converterWinsContentType() default true; // 指定转换器是否覆盖内容类型。
}
1.simple简单模式点对点模式 消息的生产者将消息放入队列 消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端) /*** 配置文件添加简单模式队列* return*/Beanpublic Queue simpleQueue(){//持久化 非独占 非自动删除return QueueBuilder.durable(simpleQueue).build();}/*** 简单模式生产者*/GetMapping(/simple)public R simple(RequestParam String msg){MapString, Object map createMsg(msg);// 预先要创建好队列rabbitTemplate.convertAndSend(simpleQueue,map);return R.ok();}/*** 简单模式的消费者** param message 消息属性* param channel 通道* param msg 消息内容* throws IOException*///使用queuesToDeclare属性如果不存在则会创建队列RabbitListener(queuesToDeclare Queue(value simpleQueue))public void simple(Message message, Channel channel, JSONObject msg) {try {MessageProperties properties message.getMessageProperties();// 这个tag每次服务重启会清0long tag properties.getDeliveryTag();log.info(简单模式的消费者收到:{}, msg);// 简单模式下消息其实无需确认// 由于在yml设置手动回执此处需要手动回执false不批量签收,回执后才能处理下一批消息channel.basicAck(tag, false);} catch (IOException e) {log.error(this.getClass().getName());}}2.work工作模式(一对多) 消息生产者将消息放入队列消费者可以有多个,消费者1,消费者2,同时监听同一个队列C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize,与同步锁的性能不一样) 保证一条消息只能被一个消费者使用) 应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢) /*** 配置文件添加Work模式队列work队列 默认是轮询发到消息者priority10 设置消费者优先级优先级相同轮询* return*/Beanpublic Queue workQueue(){//持久化 非独占 非自动删除return QueueBuilder.durable(workQueue).build();}/*** 生产者一次性生产50条消费消费者轮询消费消费者可设置优先级priority10越大越优先*/GetMapping(/work)public R work(RequestParam String msg) {for (int i 0; i 50; i) {rabbitTemplate.convertAndSend(workQueue, createMsg(i), message - {MessageProperties messageProperties message.getMessageProperties();//默认消息持久化设置消息不持久化messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);return message;});}return R.ok();}/*** 工作模式的消费者1,group分组属性不会生效** param message 消息属性* param channel 通道* param msg 消息内容* throws IOException*///使用queuesToDeclare属性如果不存在则会创建队列RabbitListener(queuesToDeclare Queue(value workQueue))public void work1(Message message, Channel channel, JSONObject msg) {try {MessageProperties properties message.getMessageProperties();long tag properties.getDeliveryTag();log.error(工作模式的消费者1收到:{}, msg);//手动回执不批量签收,回执后才能处理下一批消息channel.basicAck(tag, false);} catch (IOException e) {log.error(this.getClass().getName());}}/*** 工作模式的消费者2** param message 消息属性* param channel 通道* param msg 消息内容* throws IOException*///使用queuesToDeclare属性RabbitListener(queuesToDeclare Queue(value workQueue))public void work2(Message message, Channel channel, JSONObject msg) {try {MessageProperties properties message.getMessageProperties();long tag properties.getDeliveryTag();log.error(工作模式的消费者2收到:{}, msg);//手动回执不批量签收,回执后才能处理下一批消息channel.basicAck(tag, false);} catch (IOException e) {log.error(this.getClass().getName());}}3.publish/subscribe发布订阅(共享资源) 生产者通过fanout扇出交换机群发消息给消费者同一条消息每一个消费者都可以收到,消息生产者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费 相关场景:邮件群发,群聊天,广播(广告) //------------------方法1生产者创建交换机消费者创建队列与监听队列------------------ /*** 配置文件定义交换机** return*/Beanpublic Exchange fanout() {//持久化 非自动删除return ExchangeBuilder.fanoutExchange(fanout).build();}//创建初始化RabbitAdmin对象Beanpublic RabbitAdmin fanoutRabbitAdmin(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin new RabbitAdmin(connectionFactory);// 只有设置为 truespring 才会加载 RabbitAdmin 这个类rabbitAdmin.setAutoStartup(true);// 声明交换机 fanoutrabbitAdmin.declareExchange(fanout());return rabbitAdmin;}/** 生产者 发送50条消息消费者各自消费50条*/GetMapping(/fanout)public R fanout(RequestParam String msg){for (int i 0; i 50; i) {MapString, Object map createMsg(i);// 第二个参数为路由KeyrabbitTemplate.convertAndSend(fanout,null,map);}return R.ok();}/*** 发布订阅模式方法1的消费者1,group分组属性不会生效** param message 消息属性* param channel 通道* param msg 消息内容*/RabbitListener(// 这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除// declare false生产者已定义交换机此处不再声明交换bindings QueueBinding(value Queue, exchange Exchange(name fanout, declare false)))public void fanout1(Message message, Channel channel, JSONObject msg) {try {MessageProperties properties message.getMessageProperties();long tag properties.getDeliveryTag();log.error(发布订阅模式方法1的消费者1收到:{}, msg);// 手动回执不批量签收,回执后才能处理下一批消息channel.basicAck(tag, false);} catch (IOException e) {log.error(this.getClass().getName());}}RabbitListener(// 这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除// declare false生产者已定义交换机此处不再声明交换bindings QueueBinding(value Queue, exchange Exchange(name fanout, declare false)))public void fanout2(Message message, Channel channel, JSONObject msg) {try {MessageProperties properties message.getMessageProperties();long tag properties.getDeliveryTag();log.error(发布订阅模式方法1的消费者2收到:{}, msg);// 手动回执不批量签收,回执后才能处理下一批消息channel.basicAck(tag, false);} catch (IOException e) {log.error(this.getClass().getName());}}
//------------------方法2生产者创建队列与交换机消费者监听队列------------------/*** 定义队列 持久 非排他 非自动删除* return*/Beanpublic Queue fanoutQueue1(){return QueueBuilder.durable(fanout-queue1).build();}Beanpublic Queue fanoutQueue2(){return QueueBuilder.durable(fanout-queue2).build();}/*** 定义扇出交换机 持久 非自动删除* return*/Beanpublic FanoutExchange fanoutExchange(){return ExchangeBuilder.fanoutExchange(fanout2).build();}/*** 将队列1与交换机绑定* return*/Beanpublic Binding fanoutBinding1(){return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());}Beanpublic Binding fanoutBinding2(){return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());}//创建初始化RabbitAdmin对象Beanpublic RabbitAdmin fanoutRabbitAdmin(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin new RabbitAdmin(connectionFactory);// 只有设置为 truespring 才会加载 RabbitAdmin 这个类rabbitAdmin.setAutoStartup(true);// 声明交换机和队列rabbitAdmin.declareExchange(fanoutExchange());rabbitAdmin.declareQueue(fanoutQueue1());rabbitAdmin.declareQueue(fanoutQueue2());return rabbitAdmin;}// 不同消费者绑定在同一个交换机队列相同轮询消费队列不同各自消费/*** 发布订阅模式方法2的消费者1 队列不同生产者发送50条消息各自消费50条** param message 消息属性* param channel 通道* param msg 消息内容*///使用queuesToDeclare属性如果不存在则会创建队列,注此处声明的队列要和生产者属性保持一致RabbitListener(queuesToDeclare Queue(value fanout-queue1))public void fanout1(Message message, Channel channel, JSONObject msg) {try {MessageProperties properties message.getMessageProperties();long tag properties.getDeliveryTag();log.error(发布订阅模式方法2的消费者1收到:{}, msg);//手动回执不批量签收,回执后才能处理下一批消息channel.basicAck(tag, false);} catch (IOException e) {log.error(this.getClass().getName());}}RabbitListener(queuesToDeclare Queue(value fanout-queue2))public void fanout2(Message message, Channel channel, JSONObject msg) {try {MessageProperties properties message.getMessageProperties();long tag properties.getDeliveryTag();log.error(发布订阅模式方法2的消费者2收到:{}, msg);//手动回执不批量签收,回执后才能处理下一批消息channel.basicAck(tag, false);} catch (IOException e) {log.error(this.getClass().getName());}}4.routing路由模式 消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息; 根据业务功能定义路由字符串 从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中 /*** 定义直流交换机* return*/Beanpublic Exchange routeExchange(){//持久化 非自动删除return ExchangeBuilder.directExchange(route).build();}// 创建初始化RabbitAdmin对象Beanpublic RabbitAdmin RabbitAdminRoute(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin new RabbitAdmin(connectionFactory);// 只有设置为 truespring 才会加载 RabbitAdmin 这个类rabbitAdmin.setAutoStartup(true);// 声明直流交换机rabbitAdmin.declareExchange(routeExchange());return rabbitAdmin;}// 消费者发送消息keydev,test,prodGetMapping(/router)public R router(RequestParam String msg,RequestParam String routerKey){MapString, Object map createMsg(msg);rabbitTemplate.convertAndSend(route,routerKey,map);return R.ok();}/*** 路由式消费者1** param message 消息属性* param channel 通道* param msg 消息内容*/RabbitListener(bindings QueueBinding(// declare false生产者已定义交换机此处不再声明交换机value Queue, exchange Exchange(name route, declare false),key {prod}//路由键))public void route1(Message message, Channel channel, JSONObject msg) {try {MessageProperties properties message.getMessageProperties();String routingKey properties.getReceivedRoutingKey();log.error(路由模式方法1的消费者1收到:{},路由键:{}, msg, routingKey);//手动回执不批量签收,回执后才能处理下一批消息long tag properties.getDeliveryTag();channel.basicAck(tag, false);} catch (Exception e) {log.error(this.getClass().getName());}}/*** 路由式消费者2** param message 消息属性* param channel 通道* param msg 消息内容*/RabbitListener(bindings QueueBinding(// declare false生产者已定义交换机此处不再声明交换机value Queue, exchange Exchange(name route, declare false),key {dev,test}//路由键))public void route2(Message message, Channel channel, JSONObject msg) {try {MessageProperties properties message.getMessageProperties();String routingKey properties.getReceivedRoutingKey();log.error(路由模式方法1的消费者2收到:{},路由键:{}, msg, routingKey);//手动回执不批量签收,回执后才能处理下一批消息long tag properties.getDeliveryTag();channel.basicAck(tag, false);} catch (Exception e) {log.error(this.getClass().getName());}}5.topic 主题模式(路由模式的一种) 路由功能添加模糊匹配 消息生产者生产消息,把消息交给交换机 交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费 topic必须是 星号或#.dev.星号,不能以 molo/pcs/q0/*/data_up 这样匹配不了 /*** 定义主题交换机* return*/Beanpublic Exchange themeExchange(){//持久化 非自动删除return ExchangeBuilder.topicExchange(topic).build();}//创建初始化RabbitAdmin对象Beanpublic RabbitAdmin rabbitAdminTopic(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin new RabbitAdmin(connectionFactory);// 只有设置为 truespring 才会加载 RabbitAdmin 这个类rabbitAdmin.setAutoStartup(true);rabbitAdmin.declareExchange(themeExchange());return rabbitAdmin;}// 生产者routerKey,****.dev.********.test.****,分别走消费者1和消费者2// 通配符*#不能和 / 一起GetMapping(/topic)public R topic(RequestParam String msg, RequestParam String routerKey) {MapString, Object map createMsg(msg);rabbitTemplate.convertAndSend(topic, routeKey, map);return R.ok();}/*** 主题方法1的消费者1** param message 消息属性* param channel 通道* param msg 消息内容*/RabbitListener(bindings QueueBinding(// declare false生产者已定义交换机此处不再声明交换机value Queue, exchange Exchange(name topic, type ExchangeTypes.TOPIC),key {#.dev.*}))public void topic1(Message message, Channel channel, JSONObject msg) {try {MessageProperties properties message.getMessageProperties();String routingKey properties.getReceivedRoutingKey();log.error(主题模式方法1的消费者1收到:{},路由键:{}, msg, routingKey);//手动回执不批量签收,回执后才能处理下一批消息long tag properties.getDeliveryTag();channel.basicAck(tag, false);} catch (Exception e) {log.error(this.getClass().getName());}}/*** 路由式方法1的消费者2* # 号匹配多个 .分隔* param message 消息属性* param channel 通道* param msg 消息内容*/RabbitListener(bindings QueueBinding(// declare false生产者已定义交换机此处不再声明交换机value Queue, exchange Exchange(name topic, type ExchangeTypes.TOPIC),key {#.molo.*}))public void topic2(Message message, Channel channel, JSONObject msg) {try {MessageProperties properties message.getMessageProperties();String routingKey properties.getReceivedRoutingKey();log.error(主题模式方法1的消费者2收到:{},路由键:{}, msg, routingKey);//手动回执不批量签收,回执后才能处理下一批消息long tag properties.getDeliveryTag();channel.basicAck(tag, false);} catch (Exception e) {log.error(this.getClass().getName());}}6.RPC (基于消息的远程过程调用)
RPC即客户端远程调用服务端的方法 使用MQ可以实现RPC的异步调用基于Direct交换机实现流程如下客户端即是生产者也是消费者向RPC请求队列发送RPC调用消息同时监听RPC响应队列。服务端监听RPC请求队列的消息收到消息后执行服务端的方法得到方法返回的结果。服务端将RPC方法 的结果发送到RPC响应队列。客户端RPC调用方监听RPC响应队列接收到RPC调用结果。
延时队列、循环队列、兜底机制、定时任务
1.延时队列
使用TTL死信队列组合实现延迟队列的效果。
TTL 是 RabbitMQ 中一个消息或者队列的属性表明一条消息或者该队列中的所有消息的最大存活时间单位是毫秒。换句话说如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列那么这 条消息如果在TTL 设置的时间内没有被消费则会成为死信。如果同时配置了队列的TTL 和消息的 TTL那么较小的那个值将会被使用。TTL并不是延时发送的意思。
死信队列Dead Letter Queue是 RabbitMQ 中的一种特殊队列用于存储无法正常被消费的消息。当消息满足一定条件时例如消息过期、被拒绝或达到最大重试次数等情况会被发送到死信队列中以便后续进行处理。 // 消息设置TTLMapString, Object message createMsg(msg);rabbitTemplate.convertAndSend(exchange,routingKey, message,i-{MessageProperties properties i.getMessageProperties();properties.setExpiration(10000);return i;});// 队列设置TTLQueueBuilder.durable(delayedQueue).withArgument(x-message-ttl,10000).build(); 可以在队列指定TTL但这样并不灵活所以在生产者那指定TTL // 配置类public static final String YS_QUEUE ys_queue;public static final String YS_EXCHANGE ys_exchange;public static final String YS_ROUTING_KEY ys_routing_key;// 死信队列、交换机、路由KEYpublic static final String DLX_QUEUEdlx_queue;public static final String DLX_EXCHANGEdlx_exchange;public static final String DLX_ROUTING_KEYdlx_routing_key;// 普通的交换机及队列Beanpublic Queue normalQueue(){Map map new HashMap();// message在该队列queue的存活时间最大为10秒//map.put(x-message-ttl, 10000);// x-dead-letter-exchange参数是设置该队列的死信交换器DLXmap.put(x-dead-letter-exchange, DLX_EXCHANGE);// x-dead-letter-routing-key参数是给这个DLX指定路由键map.put(x-dead-letter-routing-key, DLX_ROUTING_KEY);return new Queue(YS_QUEUE,true,false,false,map);}Beanpublic DirectExchange normalDirectExchange(){return new DirectExchange(YS_EXCHANGE);}Beanpublic Binding normalBinding(){return BindingBuilder.bind(normalQueue()).to(normalDirectExchange()).with(YS_ROUTING_KEY);}// 死信交换机及队列Beanpublic Queue dlxQueue(){return QueueBuilder.durable(DLX_QUEUE).build();}Beanpublic DirectExchange dlxDirectExchange(){return new DirectExchange(DLX_EXCHANGE);}Beanpublic Binding dlxBinding(){return BindingBuilder.bind(dlxQueue()).to(dlxDirectExchange()).with(DLX_ROUTING_KEY);}// 生产者 设置setExpiration 消息存活时间这样更灵活// 超过Time这个时间 就会走到死信队列里面达到延时效果GetMapping(/ysmsg)public R ysmsg(String time) {JSONObject msg new JSONObject();msg.put(msg, 死信交换机 延时发送的消息);msg.put(time, System.currentTimeMillis());rabbitTemplate.convertAndSend(RabbitmqDLXConfig.YS_EXCHANGE,RabbitmqDLXConfig.YS_ROUTING_KEY,msg, i - {MessageProperties properties i.getMessageProperties();properties.setExpiration(time);return i;});return R.ok();}使用RabbitMQ官方延迟插件实现延时队列效果。
docker部署的时候已经安装了插件直接使用
不需要死信交换机和死信队列支持消息延迟投递消息投递之后没有到达投递时间是不会投递给队列
而是存储在一个分布式表当投递时间到达才会投递到目标队列
public static final String YS_QUEUE_NAME YS_queue;public static final String YS_EXCHANGE_NAME YS_exchange;public static final String YS_ROUTING_KEY YS_routingKey;Beanpublic Queue delayedQueue(){return new Queue(YS_QUEUE_NAME);}/*** 自定义交换机 定义一个延迟交换机* return*/Beanpublic CustomExchange delayedExchange(){MapString, Object args new HashMap(1);// 自定义交换机的类型args.put(x-delayed-type, direct);return new CustomExchange(YS_EXCHANGE_NAME, x-delayed-message, true, false, args);}Beanpublic Binding bindingDelayedQueue(){return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(YS_ROUTING_KEY).noargs();}// 生产者 由setExpiration改为setDelay延时毫秒消费者监听队列即可GetMapping(/ysmsg)public R ysmsg(String time) {JSONObject msg new JSONObject();msg.put(msg, 延时发送的消息);msg.put(time, System.currentTimeMillis());rabbitTemplate.convertAndSend(RabbitmqDLXConfig.YS_EXCHANGE_NAME, RabbitmqDLXConfig.YS_ROUTING_KEY, msg, i -{i.getMessageProperties().setDelay(Integer.parseInt(time));return i;});return R.ok();}2.循环队列
Rabbitmq里并没有循环队列的概念多数都是通过消费者来判断是否重新入队或是转到其它队列
也可以设置消息的重试次数。
手动确认机制下如果消费者一直不确认消息RabbitMQ 将会将该消息重新投递给其他消费者或当前消费者。
3.兜底机制
消息重试: 将处理失败的消息重新投递给消费者或其他消费者进行重试。您可以使用 RabbitMQ 的重试机制例如使用 channel.basicReject()或channel.basicNack()来将消息重新放回队列中以供后续的处理尝试。
死信队列: 在消息处理失败时将消息发送到一个专门的死信队列。死信队列是一个存储无法被消费者正常处理的消息的队列。您可以定义一个死信交换机和死信队列并将处理失败的消息路由到该死信队列。然后您可以根据需要对死信队列中的消息进行分析、转发或进一步处理。
4.定时任务
定时任务使用延时队列就可以办到