网站备案是一年一次吗,科技公司网站模版,大学学校类网站设计,广告联盟怎么赚钱ActiveMQ消息中间件详解
下载地址#xff1a;https://activemq.apache.org/activemq-5015009-release
1、MQ的产品种类
1.1、消息中间件的特性/共同特性/共同维度
Kafka#xff08;大数据专用、由java/scala编写#xff09; API发送和接收MQ的高可用性MQ的集群和容错配置…ActiveMQ消息中间件详解
下载地址https://activemq.apache.org/activemq-5015009-release
1、MQ的产品种类
1.1、消息中间件的特性/共同特性/共同维度
Kafka大数据专用、由java/scala编写 API发送和接收MQ的高可用性MQ的集群和容错配置MQ持久化 radis 特性持久化 延时发送/定时投递签收机制Spring整合 RabbitMQerlang编写 API发送和接收MQ的高可用性MQ的集群和容错配置MQ持久化 radis 特性持久化 延时发送/定时投递签收机制Spring整合 RocketMQjava编写 API发送和接收MQ的高可用性MQ的集群和容错配置MQ持久化 radis 特性持久化 延时发送/定时投递签收机制Spring整合 ActiveMQ API发送和接收MQ的高可用性MQ的集群和容错配置MQ持久化 radis 特性持久化 延时发送/定时投递签收机制Spring整合
1.2 入门场景使用概述
订单秒杀系统下单之后存在在业务的流程
读取订单、库存检查、库存冻结、余额检查、余额冻结、订单生成、余额扣减、库存扣减、生成流水、余额冻结、库存解冻
RPC接口基本是同步调用整体的服务性能遵循“木桶理论”即整体系统的耗时取决于最慢的那个接口。比如A调用B/C/D都是50 ms但是B调用B1花费的时间为2000 ms那么将会拖累整个系统的服务性能。 注在设计系统时明确达到的目标
要做到系统解耦当新的模块接进来时要做到代码的改动最小能够解耦设置流量缓冲池可以让后端按照系统自身的吞吐能力进行消费不被冲垮能够削峰强弱依赖梳理能将非常关键调用链路的操作异步化并提升整体系统的吞吐能力能够异步
消息中间件的作用解耦、削峰、异步
定义发送者把消息发送给消息服务器消息服务器将消息存放在若干队列/主题中在合适的时候消息服务器会将消息转发给消息接收者。在这个过程中发送和接收都是异步的也就是发送无需等待而且发送者和接收者的生命周期也没有必然的关系尤其在发布pub/订阅sub模式下也可以完成一对多的通信即让一个消息有多个接收者。 队列queue相当于发短信一对一。
主题(topic)相当于朋友圈需要订阅公众号一对多。
特点
异步处理应用系统之间解耦削峰 ActiveMQ的解压安装
在官网进行下载下载地址https://activemq.apache.org/activemq-5015009-release 注建议下载linux版本消息队列大多数情况都是在集群的环境下进行部署而集群的使用大多数是使用linux
上传linux压缩包并进行解压注外来文件放在/opt下 进行启动(在bin目录下)
./activemq start #启动
./activemq stop #关闭
./activemq restart #重启查看启动是否成功activemq的默认进程编号为61616查看进程是否被占用
netstat -anp|grep 61616
lsof -i 61616
ps -ef|grep activemq
#可以访问http://localhost:8161 可现可视化界面
#默认的用户名/密码 admin/admin注页面访问点击进行登录
携带日志启动
./activemq start 路径/run_activemq.log1.3、JMS编码总体架构 2、代码编写
IDEA创建新的maven工程并引进相应版本的依赖。可在你下载ActiveMQ的版本网页查找对应版本的maven依赖 dependencygroupIdorg.apache.activemq/groupIdartifactIdactivemq-all/artifactIdversion5.15.9/version
/dependency2.1、队列(Queues)实现
注ActiveMQ的访问地址的方式为tcp的方式
进行生产者进行消息发送
//生产者
public static void queueDemo() throws JMSException {String pathtcp://192.168.160.128:61616/;String name queue01;//创建连接工厂,采用默认的用户名和密码ActiveMQConnectionFactory connectionFactory new ActiveMQConnectionFactory(path);//创建连接并启动Connection connection connectionFactory.createConnection();connection.start();//创建会话session第一个参数是事务第二个参数是签收Session session connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建目的地Queue queue session.createQueue(name);//创建消息生产者MessageProducer producer session.createProducer(queue);//进行消息发送for (int i 0;i3;i){//创建消息String message 这是第i条消息;//创建消息Message textMessage session.createTextMessage(message);//消息发送producer.send(textMessage);}producer.close();session.close();connection.close();System.err.println(消息发送完成);}//消费者
public static void queueConsumersDemo() throws JMSException {String path tcp://192.168.160.128:61616;String name queue01;//创建连接工厂ActiveMQConnectionFactory connectionFactory new ActiveMQConnectionFactory(path);//创建链接Connection connection connectionFactory.createConnection();//启动connection.start();//创建sessionSession session connection.createSession(false, AUTO_ACKNOWLEDGE);//创建消费者Queue queue session.createQueue(name);//创建消费者MessageConsumer consumer session.createConsumer(queue);while (true){//可以通过recive设置等待时间当超时时消费者自动关闭//Message receive consumer.receive(4000L);Message receive consumer.receive();if (receive ! null){System.err.println(receive);}}}注tcp对应的进程的端口为61616注意连接的地址 队列监听setMessageListener public static void setListener() throws JMSException, IOException {ActiveMQConnectionFactory MQ new ActiveMQConnectionFactory(path);Connection conn MQ.createConnection();conn.start();Session session conn.createSession(false, AUTO_ACKNOWLEDGE);Queue queue session.createQueue(name);MessageConsumer consumer session.createConsumer(queue);consumer.setMessageListener(new MessageListener() {Overridepublic void onMessage(Message message) {if (message!null ){try {
// TextMessage receive (TextMessage) consumer.receive();TextMessage textMessage (TextMessage) message;System.out.println(textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}}});//防止后台关闭对MQ进行监听当有消息的时候进行输出System.in.read();consumer.close();session.close();conn.close();}在当多个消费者进行等待之后生产者进行消息的产生每个消费者对消息进行平均分配类似于负载均衡。
例当两个消费者在进行等待时生产者产生6条消息则两个消费者对者6条消息进行平均分配没人三条。
2.2、主题(topic)代码实现
特点
生产者将消息发送到topic中每个消息可以有多个消费者属于1:N的关系生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息生产者生产时topic不保存消息它是无状态不能落地例如无人订阅就去生产那是一条废消息所以一般先启动消费者在启动生产者
JMS规范允许客户创建持久订阅还在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话类似于微信公众号的订阅
//主题生产者public static void topicProduce() throws JMSException {ActiveMQConnectionFactory mq new ActiveMQConnectionFactory(path);Connection connection mq.createConnection();connection.start();Session session connection.createSession(false, AUTO_ACKNOWLEDGE);Topic topic session.createTopic(topicName);MessageProducer producer session.createProducer(topic);for (int i 0; i 3; i){String tpc 这是第一条消息;TextMessage textMessage session.createTextMessage(tpc);producer.send(textMessage);}producer.close();session.close();connection.close();System.err.println(主题发送成功);}//创建消费者
public static void topicConsumer() throws JMSException, IOException {ActiveMQConnectionFactory mq new ActiveMQConnectionFactory(path);Connection connection mq.createConnection();connection.start();Session session connection.createSession(false, AUTO_ACKNOWLEDGE);Topic topic session.createTopic(topicName);MessageConsumer consumer session.createConsumer(topic);consumer.setMessageListener(new MessageListener() {Overridepublic void onMessage(Message message) {TextMessage textMessage (TextMessage) message;try {System.err.println(textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}});System.in.read();consumer.close();session.close();connection.close();}注主题的启动顺序先启动消费者在启动生产者。消费者每个都会得到一份生产者发出的所有的信息。 3、浅谈JMS与JavaEE
3.1、什么是JavaEE
JavaEE是一套使用Java进行企业级应用开发的大家一直遵循的13个核心规范工业标准。JavaEE平台提供了一个基于组件的方法来加快设计、开发、装配即部署企业应用程序。 3.2、什么时JMSJava消息服务
Java消息服务是指的两个应用程序之间进行异步通信的API它为标准消息协议和消息服务提供了一组通用接口包括创建、发送、读取消息等用于支持Java应用程序的开发在JavaEE中当两个应用程序使用JMS进行通信时他们之间并不是直接相连而是通过一个共同的消息收发服务组件关联起来以达到解耦、异步、削峰的效果 3.3、MQ中间件的其他落地产品 特性ActiveMQRabbitMQRocketMQKafka单机吞吐量万级比 RocketMQ、Kafka 低一个数量级同 ActiveMQ10 万级支撑高吞吐10 万级高吞吐一般配合大数据类的系统来进行实时数据计算、日志采集等场景opic 数量对吞吐量的影响topic 可以达到几百/几千的级别吞吐量会有较小幅度的下降这是 RocketMQ 的一大优势在同等机器下可以支撑大量的 topictopic 从几十到几百个时候吞吐量会大幅度下降在同等机器下Kafka 尽量保证 topic 数量不要过多如果要支撑大规模的 topic需要增加更多的机器资源时效性ms 级微秒级这是 RabbitMQ 的一大特点延迟最低ms 级延迟在 ms 级以内可用性高基于主从架构实现高可用同 ActiveMQ非常高分布式架构非常高分布式一个数据多个副本少数机器宕机不会丢失数据不会导致不可用消息可靠性有较低的概率丢失数据基本不丢经过参数优化配置可以做到 0 丢失同 RocketMQ功能支持MQ 领域的功能极其完备基于 erlang 开发并发能力很强性能极好延时很低MQ 功能较为完善还是分布式的扩展性好功能较为简单主要支持简单的 MQ 功能在大数据领域的实时计算以及日志采集被大规模使用其他Apache软件基金会开发、起步较早但没有经过大量吞吐场景验证目前社区不是很活跃开源稳定社区活跃度高阿里出品目前已交给Apache但社区活跃度较低Apache软件基金会开发、开源、高通吐量社区活跃度高
3.4、JMS组成结构和特点
JMS provider 实现JMS接口和规范的消息中间件也就是我们的MQ服务器JMS producer 消息的生产者 创建和发送消息的客户端应用JMS consumer 消息消费者 接受和处理JMS消息的终端JMS message 产生的消息信息体 消息头消息属性消息体 封装具体的消息数据5中消息体格式发送和接收的消息体类型必须一一对应
注发送什么类型的消息就得接收什么类型的消息要一一对应。
消息持久模式与非持久模式
持久性应该被传送“一次仅仅一次”这意味着如果JMS提供者出现故障该消息不会丢失。他会在服务器恢复之后再次传递注主要没有被发送便会一直在消息服务器中存储非持久最多会传送一次只要服务器出现故障消息就会被丢失
消息头属性
JMSDestination 消息发动的目的地JMSDeliveryMode 消息是否持久JMSExpiration 消息的过期时间默认时永不过期可设置消息在一定时间之后会过期。消息的过期时间Destination的send方法中的timeToLive值加上发送时刻的GMT时间值。如果timeToLive的值为0表示消息永不过期如果发送后在消息过期时间之后消息还没有被发送到消息的目的地则该消息被清除。JMSPriority 消息优先级从0-9十个级别0-4是普通消息5-9是加急消息。JMS不要求MQ严格按照者是个优先级发送消息但必须保重加急消息要先于普通消息到达默认级别是4JMSMessageId 消息ID消息的唯一识别方式。
消息体的五种属性
TextMessage 字符串类型MapMessage Map类型BytesMessage 字节类型二进制数组消息StreamMessage 流类型ObjectMessage 对象类型
消息属性
如果需要除消息头字段以外的值那么可以使用消息属性识别/去重/重点标注等操作非常有用的方法。他们是以属性名和**属性值对K:V**的形式指定的可以将属性是为消息头的扩展属性指定一些消息头没有包括的附加消息比如可以在属性里指定消息选择器。
消息的属性就像可以分配给一条消息的附加消息头一样他们可以允许开发者添加有关消息的不透明附加消息他们还用于暴漏消息选择器在消息过滤是使用的数据。
例
TextMessage message session.createTextMessage();
message.setText(text);
message.setStringProperty(username,ABC)// 进行消息自定义3.5、消息持久性
3.5.1、持久的队列Queue
消息队列中默认的消息为消息持久化
采用持久性消息当ActiveMQ宕机时未消费的消息的数量保持不变有利于保持消息而不被丢失。
队列的默认传送模式此模式保证这些消息被成功的发送一次和成功的使用一次。对于这些消息可靠性是优先考虑的因素。
可靠性另一个重要的方面是确保持久性消息传送至目标后消息服务在向消费者传送它们之前不会丢失这些消息。
3.5.2、持久主题topic
先启动订阅在启动生产 //持久化主题消息消费者public static void lastingTopicConsumer() throws JMSException, IOException {ActiveMQConnectionFactory mq new ActiveMQConnectionFactory(path);Connection connection mq.createConnection();connection.setClientID(name);Session session connection.createSession(false, AUTO_ACKNOWLEDGE);Topic topic session.createTopic(topicName);TopicSubscriber topicSubscriber session.createDurableSubscriber(topic,remark...);connection.start();//主题的订阅者Message receive topicSubscriber.receive();while (receive!null){TextMessage textMessage (TextMessage) receive;System.out.println(textMessage.getText());receive topicSubscriber.receive(5000);}System.in.read();session.close();connection.close();}一定要先运行一次消费者类似于订阅这个主题然后在运行生产者发送信息无论消费者是否在线都会接收到不在线的话下次连接的时候会把没有收到的消息接收下来
4、ActiveMQ的broker
定义相当于一个ActiveMQ服务器实例
Broker其实就是实现了用代码的形式启动ActiveMQ将MQ嵌入到Java代码中以便随时用随时启动
在用的时候再去启动这样节省了资源也保证了可靠性。
方式
用ActiveMQ Broker作为独立的消息服务器来构建java应用
ActiveMQ也支持在VM中通信基于嵌入式的broker能够无缝的集成其他java应用
所需mvn依赖 //ActiveMQ核心依赖dependencygroupIdorg.apache.activemq/groupIdartifactIdactivemq-all/artifactIdversion5.15.9/version/dependency// 整合spring所需要的依赖 dependencygroupIdorg.apache.xbean/groupIdartifactIdxbean-spring/artifactIdversion3.16/version/dependency//进行json数据格式转换在进行Java内嵌activqMQ时需要引进否则会报错dependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactIdversion2.9.5/version/dependency代码实现
//相当于在本机上边开启了一个ActiveMQ的服务
public static void main(String[] args) throws Exception {BrokerService brokerService new BrokerService();brokerService.setUseJmx(true);brokerService.addConnector(tcp://localhost:61616);brokerService.start();}5、spring整合ActiveMQ
application.xml
?xml version1.0 encodingUTF-8?
beans xmlnshttp://www.springframework.org/schema/beansxmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexmlns:aophttp://www.springframework.org/schema/aopxmlns:txhttp://www.springframework.org/schema/txxmlns:contexthttp://www.springframework.org/schema/contextxsi:schemaLocationhttp://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd!--开启包的自动扫描--context:component-scan base-packagecom.atguigu.activemq/bean idjmsFactory classorg.apache.activemq.pool.PooledConnectionFactory destroy-methodstopproperty nameconnectionFactorybean classorg.apache.activemq.ActiveMQConnectionFactoryproperty namebrokerURL valuetcp:192.168.160.128//bean/propertyproperty namemaxConnections value100//beanbean iddestinationQueue classorg.apache.activemq.command.ActiveMQQueueconstructor-arg index0 valuespring-active-queue//beanbean idjsmTemplate classorg.springframework.jms.core.JmsTemplateproperty nameconnectionFactory refjmsFactory/property namedefaultDestination refdestinationQueue/property namemessageConverterbean classorg.springframework.jms.support.converter.SimpleMessageConverter//property/bean
/beans生产者
package com.atguigu.activemq;import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;Service
public class Producer {Autowiredprivate JmsTemplate jmsTemplate;public static void main(String[] args) {ApplicationContext applicationContext new ClassPathXmlApplicationContext(applicationContext.xml);Producer producer (Producer) applicationContext.getBean(Producer);producer.jmsTemplate.send(new MessageCreator() {Overridepublic Message createMessage(Session session) throws JMSException {TextMessage textMessage session.createTextMessage(********当前消息);return textMessage;}});}
}
消费者
package com.atguigu.activemq;import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;Service
public class Customer {Autowiredprivate JmsTemplate jmsTemplate;public static void main(String[] args) {ApplicationContext applicationContext new ClassPathXmlApplicationContext(applicationContext.xml);Customer customer (Customer) applicationContext.getBean(Customer);String receiveAndConvert (String) customer.jmsTemplate.receiveAndConvert();System.out.println(receiveAndConvert);}
}
mvn依赖
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdorg.example/groupIdartifactIdActiveMQDemo/artifactIdversion1.0-SNAPSHOT/versiondependencies!--activemq需要的jav包--dependencygroupIdorg.apache.activemq/groupIdartifactIdactivemq-all/artifactIdversion5.15.9/version/dependencydependencygroupIdorg.apache.xbean/groupIdartifactIdxbean-spring/artifactIdversion3.16/version/dependencydependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactIdversion2.9.5/version/dependencydependencygroupIdorg.springframework/groupIdartifactIdspring-jms/artifactIdversion4.3.23.RELEASE/version/dependencydependencygroupIdorg.apache.activemq/groupIdartifactIdactivemq-pool/artifactIdversion5.15.9/version/dependencydependencygroupIdorg.springframework/groupIdartifactIdspring-core/artifactIdversion4.3.23.RELEASE/version/dependencydependencygroupIdorg.springframework/groupIdartifactIdspring-context/artifactIdversion4.3.23.RELEASE/version/dependencydependencygroupIdorg.springframework/groupIdartifactIdspring-aop/artifactIdversion4.3.23.RELEASE/version/dependencydependencygroupIdorg.springframework/groupIdartifactIdspring-orm/artifactIdversion4.3.23.RELEASE/version/dependencydependencygroupIdorg.aspectj/groupIdartifactIdaspectjrt/artifactIdversion1.6.1/version/dependencydependencygroupIdorg.aspectj/groupIdartifactIdaspectjweaver/artifactIdversion1.6.8/version/dependencydependencygroupIdcglib/groupIdartifactIdcglib/artifactIdversion2.1_2/version/dependency!--下面是junit/log4等通用配置--dependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion1.7.25/version/dependencydependencygroupIdch.qos.logback/groupIdartifactIdlogback-classic/artifactIdversion1.2.3/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.16.18/version/dependencydependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion4.12/version/dependency/dependenciespropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.target/properties/project6、springBoot整合ActiveMQ
7、ActiveMQ的传输协议
ActiveMQ支持的client-broker通讯协议有TCP、NIO、 UDP、SSL、HttpsHttp、VM 。其中配置Transport Connector的文件在ActiveMQ安装目录的conf/activemq.xml中的标签之内。 注在ActiveMQ中默认的使用TCP协议也默认支持多种的传输协议
设置支持NIO网络协议
transportConnector namenio urinio://0.0.0.0:61618/设置进行多协议支持在5.13之后在支持auto多协议同时支持
transportConnector nameautonio uriautonio://0.0.0.0:61617?auto.protocolsdefault,stomp/8、ActiveMQ的消息存储和持久化
为了避免以为宕机之后数据的丢失需要做到重启之后可以恢复消息队列消息系统一般都会采用持久化机制。ActiveMQ的消息持久化机制又JDBC、AMQ、KahaDB(默认使用)、LevelDB无论使用哪一种持久化的机制消息存储的逻辑都是一致的。
消息存储机制
就是在发送者在发送出去之后消息中心首先将消息存储到本地的数据文件、内存数据库或者远程数据库等在试图将消息在发送给接收者成功则将消息从存储中进行删除失败则继续尝试发送。消息中心启动以后首先要检查指定的存储位置如果有未发送成功的消息则需要将消息发送出去。
KahaDB在5.3版本之后建议推荐使用KahaDB存储方式在5.4版本之后默认使用kahaDB存储方式 默认的文件的存储位置在activeMQ下的data文件中 8.1、KahaDB存储机制的原理
在KahaDB在消息目录进行存储时。只有4类文件和一个lock。以下四个文件还有一个db.free 四类文件一把锁 KahaDB会将消息存储在db-.log文件之中当一个文件已满时默认的一个文件的大小为32MB会自动创建一个新的文件进行相关的存储。例db-1.log、db-2.log …。当不会再有引用到数据文件中的任何数据消息时文件会被删除或者时归档。db.data该文件包含了持久化的BTree索引索引了消息数据记录中的消息他是消息的索引文件本质是B-TreeB树使用B-Tree作为索引执行db-.log文件中存储的消息db.free当前db.data文件那些页面是空闲的文件具体内通过是所有空闲页面的IDdb.radio是用来进行消息恢复的当KahaDB消息存储被强制退出后启动用于恢复BTree索引lock为文件的读取进行添加锁机制防止数据出现混乱。
8.2、JDBC消息存储一部分消息会被存储到数据库中
注对于长时间的存储建议使用JDBC的存储方式
将数据库驱动jar包放到MQ的lib文件夹下做JDBC吃持久化的配置对文件进行修改适配 persistenceAdapter jdbcPersistenceAdapter dataSource#mysql-ds createTablesOnStartuptrue/
/persistenceAdapterdataSource指定将要引用的持久化数据库的bean名称 createTablesOnStartup 是否在启动的时候创建数据表默认值是true 这样每次启动都会去创建数据表一般是第一次启动的时候设置为true 之后改成false;
数据库连接池的配置
relaxAutoCommit 表示进行自动提交bean idmysql-ds classorg.apache.commons.dbcp2.BasicDataSource destroy-methodclose property namedriverClassName valuecom.mysql.jdbc.Driver/ property nameurl valuejdbc:mysql://localhost/activemq?relaxAutoCommittrue/ property nameusername valueactivemq/ property namepassword valueactivemq/ property namepoolPreparedStatements valuetrue/
/bean 注注意配置信息将要添加的位置否则可能会报错
**
建仓SQL和建表说明
创建对应名称的数据库创建表默认表名 ACTIVEMQ_MSGSACTIVEMQ_ACKSACTIVEMQ_LOCK 如果新建数据库OK上述的配置OK代码运行OK3表会自动生成。
ACTIVEMQ_MSGS表字段
ID:自增数据库主键
CONTAINER:消息的Destination
MSGID_PROD:消息发送者的主键
MSG_SEQ:是发下哦那个消息的顺序MSGID_PRODMSG_SEQ可以足证JMS的MessageID
EXPIRATION:消息的过期时间存储的是从1970-01-01
MSG:消息本体的Java序列对象的二进制数据
PRIORITY:优先级0-9数值越大优先级越高ACTIVEMQ_ACKS表字段用于存储订阅关系如果是持久化topic订阅者和服务器的订阅关系在这个表里面进行保存
CONTAINER:消息的介绍
SUB_DEST:如果使用的是Static集群这个字段会有集群其他系统的信息
CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用以区分
SUB_NAME:订阅者名称
SELECTOR:选择器可以选择之消费满足条件的信息条件可以用自定义的属性进行实现支持多属性AND和OR
LAST_ACKED_ID:记录消费过的消息的ID可能出现的错误 解决方法
在保证activeMQ在使用JDBC消息持久化时所需要的jar包以及数据库密码用户、地址都没有错误的情况下造成报错的原因可能是数据库远程连接的权限没有被放开导致数据库在进行远程连接时连接失败。这种情况下需要放开数据库远程连接的权限。
在本地通过连接数据库的工具或者cmd命令框进入到数据库都可以在这我采用cmd
mysql -u root -p查看mysql中存在的库,并使用mysql库
show databases;
use mysql;查看对应表
select User,Host from user;注如果权限没有被放开的情况下root所对应的Host为localhost这是我们需要将其改为%
update set Host% where Userroot;进行配置刷新
flush privileges;注在修改完成之后一定要进行配置刷新否则相关修改不起作用
在服务器重启activeMQ进行日志查看 启动成功activeMQ会在你对应的数据库中创建相关的表 注在使用消息持久化存储机制时一定要将activeMQ设置为持久化。否则不会将信息存储到数据库中
点对点类型 在DeliveryMode设置为NODE_PERSISTENCE(非持久化)时消息保存在内存中
在DeliveryMode设置为PERSISTENCE(持久化)消息保存在broker的相应的文件或者数据库中
而且点对点类型中消息一旦被消费消息就会在存储的位置进行删除操作。
在使用Topic时在消息持久化消息在被消费的时候消息不会被删除。
8.3、开发中遇到的问题
如果是queue
在没有消费者消费的情况下会将信息保存在activemq_msgs表中只要有任意消费者已经消费过了消费之后这些消息将被删除。
如果时topic
一般是先启动消费订阅然后在生产的情况下会将消息保存到activemq_acks表中
数据库jar包
记得需要使用到的相关的jar文件放置到lib目录下mysql-jdbc驱动的jar包和对应的数据库连接池的jar包。默认是dbcp
createTableOnStartup属性 在jdbcPersistenceAdapter标签中设置了createTableOnStartup属性为true时在第一次启动ActiveMQ时ActiveMQ服务节点会自动在数据库中创建相关的数据库表启动完成后可以去掉这个属性或者是将属性值修改为false。其属性值默认为true建议在不用的时候将属性值修改为false
下划线的问题
“java.lang.lllegalStateExcepton:BeanFactory not initialized or already closed”产生报错的原因是因为您的操作系统的机器名中存在“_”符号请修改机器名并重启可以解决相关问题。
8.4、高性能缓存ActiveMQ Journal
在activemq.xml中进行配置
persistenceFactoryjournalPersistenceAdapterFactory journalLogFiles4journalLogFileSizeuseJournaltrueuseQuickJournaltruedataSource#mysql-dsdataDirectoryactivemq-data/
/persistenceFactory 配置完成之后对activeMQ进行重启操作
9、ActiveMQ的高级特性
9.1、ActiveMQ异步传输以及确认发送成功
注ActiveMQ默认的消息发送方式为异步传输但是建议在进行代码编写时再次将消息传输的方式设置为异步 注当消息设置为不采用事务但是却将消息设置为持久化时MQ会默认将消息的传输 方式设置为同步消息传输。这样的传输方式只有当消息发送出去之后只有被接受返回成功之后才会进行下一个消息的处理容易造成堵塞。
注在进行异步消息传输时MQ允许存在极少量的数据丢失也会存在极少量数据丢失的情况。
例
cf new ActiveMQConnectionFactory(tcp://locahost:61616?jms.useAsyncSendtrue);((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);((ActiveMQConnection)connection).setUseAsyncSend(true);注在使用异步消息队列是注意要采用消息的回调来确认消息是否发送成功 public static void queueDemo() throws JMSException {
// String pathtcp://192.168.160.128:61616/;String name queue01;//创建连接工厂,采用默认的用户名和密码ActiveMQConnectionFactory connectionFactory new ActiveMQConnectionFactory(tcp://locahost:61616?jms.useAsyncSendtrue);//创建连接并启动Connection connection connectionFactory.createConnection();connection.start();//创建会话session第一个参数是事务第二个参数是签收Session session connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建目的地Queue queue session.createQueue(name);//创建消息生产者ActiveMQMessageProducer producer (ActiveMQMessageProducer) session.createProducer(queue);//进行消息发送for (int i 0;i3;i){//创建消息String message 这是第i条消息;//创建消息Message textMessage session.createTextMessage(message);textMessage.setJMSMessageID(UUID.randomUUID().toString().replaceAll(-,)); //设置消息的id//消息发送producer.send(textMessage, new AsyncCallback() {Overridepublic void onSuccess() {//进行消息的回调System.out.println(发送成功的消息textMessage.toString());}Overridepublic void onException(JMSException e) {//进行消息的回调System.out.println(发送失败的消息textMessage.toString());}});}producer.close();session.close();connection.close();System.err.println(消息发送完成);}9.2、消息的延时发送和定时发送 9.3、分发策略
9.4、ActiveMQ重试机制重新交付政策
官网地址https://activemq.apache.org/redelivery-policy
引起消息重发的情况
Client用了transaction且在session中调用了rollback();(没被签收被回调)
Client用了transactions且在调用commit()之前关闭或者时没有commit(事务没有被提交)
Client在CLIENT_ACKNOWLEDGE的传递模式下在session中调用了recover()
默认每一秒钟发送6次当消息发送超过最大次数时该消息会被标志位异常消息最终会被存储到死信队列中
常用属性 9.5、死信队列
处理发送失败的消息
一般的生产环境之中MQ一般会设置两个队列核心业务队列和死信队列核心业务队列就是处理正常的业务信息死信队列主主要是处理异常的业务队列信息
将所有的DeadLetter保存到一个共享的队列之中是ActiveMQ的默认的策略
共享队列默认为ActiveMQ.DLQ可以通过deadLetterQueue属性进行设定
deadLetterStrategeshareDeadLetterStrategy deadLetterQueueDLQ-QUEUE/
/deadLetterStratege 可以将定时自动删除死信队列中的消息或者可以存储非持久的异常消息
9.6、如何保证消息不被重复消费
可以通过radis进行相关的解决