做网站需要美工吗,怎样查看网站开发,网站与经营网站,怎样推广小程序1 ActiveMQ简介1.1 ActiveMQ是什么ActiveMQ是一个消息队列应用服务器(推送服务器)。支持JMS规范。1.1.1 JMS概述全称#xff1a;Java Message Service #xff0c;即为Java消息服务#xff0c;是一套java消息服务的API标准。(标准即接口)实现了JMS标准的系统#xff0c;称之…1 ActiveMQ简介1.1 ActiveMQ是什么ActiveMQ是一个消息队列应用服务器(推送服务器)。支持JMS规范。1.1.1 JMS概述全称Java Message Service 即为Java消息服务是一套java消息服务的API标准。(标准即接口)实现了JMS标准的系统称之为JMS Provider。1.1.2 消息队列1.1.2.1 概念消息队列是在消息的传输过程中保存消息的容器提供一种不同进程或者同一进程不同线程直接通讯的方式。Producer消息生产者负责产生和发送消息到BrokerBroker消息处理中心。负责消息存储、确认、重试等一般其中会包含多个queueConsumer消息消费者负责从Broker中获取消息并进行相应处理1.1.2.2 常见消息队列应用(1)、ActiveMQActiveMQ 是Apache出品最流行的能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。(2)、RabbitMQRabbitMQ是一个在AMQP基础上完成的可复用的企业消息系统。他遵循Mozilla Public License开源协议。开发语言为Erlang。(3)、RocketMQ由阿里巴巴定义开发的一套消息队列应用服务。1.2 ActiveMQ能做什么(1)实现两个不同应用(程序)之间的消息通讯。(2)实现同一个应用不同模块之间的消息通讯。(确保数据发送的稳定性)1.3 ActiveMQ下载ActiveMQ下载地址http://activemq.apache.org/download-archives.html--可供下载的历史版本--说明ActiveMQ 5.10.x以上版本必须使用JDK1.8才能正常使用。ActiveMQ 5.9.x及以下版本使用JDK1.7即可正常使用。--根据操作系统选择下载版本。(本教程下载Linux版本)1.4 ActiveMQ主要特点(1)支持多语言、多协议客户端。语言: Java,C,C,C#,Ruby,Perl,Python,PHP。应用协议OpenWire,Stomp REST,WS Notification,XMPP,AMQP(2)对Spring的支持ActiveMQ可以很容易整合到Spring的系统里面去。(3)支持高可用、高性能的集群模式。2 入门示例2.1 需求使用ActiveMQ实现消息队列模型。2.2 配置步骤说明(1)搭建ActiveMQ消息服务器。(2)创建一个java项目。(3)创建消息生产者发送消息。(4)创建消息消费者接收消息。2.3 第一部分搭建ActiveMQ消息服务器2.3.1 第一步下载、上传至Linux--说明确保已经安装了jdk2.3.2 第二步安装到/usr/local/activemq目录(1)解压到/usr/local目录下[rootnode07192 ~]# tar -zxvf apache-activemq-5.9.0-bin.tar.gz -C /usr/local(2)修改名称为activemq[rootnode07192 ~]# cd /usr/local/[rootnode07192 local]# mv apache-activemq-5.9.0/ activemq2.3.3 第三步启动ActiveMQ服务器--说明ActiveMQ是免安装软件解压即可启动服务。[rootnode07192 local]# cd activemq/bin[rootnode07192 bin]# ./activemq start--查看ActiveMQ启动状态[rootnode07192 bin]# ./activemq status2.3.4 第四步浏览器访问ActiveMQ管理界面2.3.4.1 Step1查看ActiveMQ管理界面的服务端口。在/conf/jetty.xml中--访问管理控制台的服务端口默认为8161[rootnode07192 bin]# cd ../conf[rootnode07192 conf]# vim jetty.xml2.3.4.2 Step2查看ActiveMQ用户、密码。在/conf/users.properties中:--默认的用户名、密码均为amdin[rootnode07192 conf]# vim users.properties2.3.4.3 Step3访问ActiveMQ管理控制台。地址http://ip:8161/--注意防火墙是没有配置该服务的端口的。因此要访问该服务必须在防火墙中配置。(1)修改防火墙开放8161端口[rootnode07192 conf]# vim /etc/sysconfig/iptables(2)重启防火墙[rootnode07192 conf]# service iptables restart(3)登录管理控制台--登陆用户名、密码均为admin--控制台主界面--搭建ActiveMQ服务器成功!!!2.4 第二部分创建java项目导入jar包--导包说明ActiveMQ的解压包中提供了运行ActiveMQ的所有jar。--创建项目2.5 第三部分创建消息生成者发送消息--说明ActiveMQ是实现了JMS规范的。在实现消息服务的时候必须基于API接口规范。2.5.1 JMS常用的API说明下述API都是接口类型,定义在javax.jms包中是JMS标准接口定义。ActiveMQ完全实现这一套api标准。2.5.1.1 ConnectionFactory链接工厂, 用于创建链接的工厂类型。2.5.1.2 Connection链接用于建立访问ActiveMQ连接的类型,由链接工厂创建。2.5.1.3 Session会话, 一次持久有效、有状态的访问由链接创建。2.5.1.4 Destination Queue Topic目的地, 即本次访问ActiveMQ消息队列的地址由Session会话创建。(1)interfaceQueue extends Destination(2)Queue队列模型只有一个消费者。消息一旦被消费默认删除。(3)Topic主题订阅中的消息会发送给所有的消费者同时处理。2.5.1.5 Message消息在消息传递过程中数据载体对象是所有消息【文本消息TextMessage对象消息ObjectMessage等】具体类型的顶级接口可以通过会话创建或通过会话从ActiveMQ服务中获取。2.5.1.6 MessageProducer消息生成者, 在一次有效会话中,用于发送消息给ActiveMQ服务的工具由Session会话创建。2.5.1.7 MessageCustomer消息消费者【消息订阅者消息处理者】, 在一次有效会话中,用于ActiveMQ服务中获取消息的工具由Session会话创建。我们定义的消息生产者和消费者都是基于上面API实现的。2.5.2 第一步创建MyProducer类定义sendMessage方法package cn.gzsxt.mq.producer;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageProducer;import javax.jms.Session;import org.apache.activemq.ActiveMQConnectionFactory;public class MyProducer {// 定义链接工厂ConnectionFactory connectionFactory null;// 定义链接Connection connection null;// 定义会话Session session null;// 定义目的地Destination destination null;// 定义消息生成者MessageProducer producer null;// 定义消息Message message null;public void sendToMQ(){try{/** 创建链接工厂* ActiveMQConnectionFactory - 由ActiveMQ实现的ConnectionFactory接口实现类.* 构造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)* userName - 访问ActiveMQ服务的用户名,用户名可以通过jetty-realm.properties配置文件配置.* password - 访问ActiveMQ服务的密码,密码可以通过jetty-realm.properties配置文件配置.* brokerURL - 访问ActiveMQ服务的路径地址.路径结构为-协议名://主机地址:端口号* 此链接基于TCP/IP协议.*/connectionFactory new ActiveMQConnectionFactory(admin, admin, tcp://192.168.23.13:61616);// 创建链接对象connection connectionFactory.createConnection();// 启动链接connection.start();/** 创建会话对象* 方法- connection.createSession(boolean transacted,int acknowledgeMode);* transacted - 是否使用事务,可选值为true|false* true - 使用事务,当设置此变量值,则acknowledgeMode参数无效,建议传递的acknowledgeMode参数值为* Session.SESSION_TRANSACTED* false - 不使用事务,设置此变量值,则acknowledgeMode参数必须设置.* acknowledgeMode - 消息确认机制,可选值为:* Session.AUTO_ACKNOWLEDGE - 自动确认消息机制* Session.CLIENT_ACKNOWLEDGE - 客户端确认消息机制* Session.DUPS_OK_ACKNOWLEDGE - 有副本的客户端确认消息机制*/session connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 创建目的地,目的地命名即队列命名,消息消费者需要通过此命名访问对应的队列destination session.createQueue(test-mq);// 创建消息生成者,创建的消息生成者与某目的地对应,即方法参数目的地.producer session.createProducer(destination);// 创建消息对象,创建一个文本消息,此消息对象中保存要传递的文本数据.message session.createTextMessage(hello,activeme);// 发送消息producer.send(message);System.out.println(消息发送成功);}catch(Exception e){e.printStackTrace();System.out.println(访问ActiveMQ服务发生错误!!);}finally{try {// 回收消息发送者资源if(null ! producer)producer.close();} catch (JMSException e) {e.printStackTrace();}try {// 回收会话资源if(null ! session)session.close();} catch (JMSException e) {e.printStackTrace();}try {// 回收链接资源if(null ! connection)connection.close();} catch (JMSException e) {e.printStackTrace();}}}}2.5.3 第二步创建一个测试类MessageTest--添加junit类库快捷键ctrl1package cn.gzsxt.mq.test;import org.junit.Test;import cn.gzsxt.mq.producer.MyProducer;public class MessageTest {Testpublic void sendToMQ(){MyProducer producer new MyProducer();producer.sendToMQ();}}2.5.4 第三步测试(1)设置防火墙配置61616端口。注意修改之后重启防火墙。(2)测试结果--查看控制台--查看ActiveMQ管理控制界面--消息发送成功!!!2.6 第四部分创建消息消费者消费消息2.6.1 第一步创建MyConsumer类package cn.gzsxt.mq.consumer;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/*** ClassName:MyConsumer* Description: 消息消费者代码*/public class MyConsumer {// 定义链接工厂ConnectionFactory connectionFactory null;// 定义链接Connection connection null;// 定义会话Session session null;// 定义目的地Destination destination null;// 定义消息消费者MessageConsumer consumer null;// 定义消息Message message null;public void recieveFromMQ(){try{/** 创建链接工厂* ActiveMQConnectionFactory - 由ActiveMQ实现的ConnectionFactory接口实现类.* 构造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)* userName - 访问ActiveMQ服务的用户名,用户名可以通过jetty-realm.properties配置文件配置.* password - 访问ActiveMQ服务的密码,密码可以通过jetty-realm.properties配置文件配置.* brokerURL - 访问ActiveMQ服务的路径地址.路径结构为-协议名://主机地址:端口号* 此链接基于TCP/IP协议.*/connectionFactory new ActiveMQConnectionFactory(admin, admin, tcp://192.168.23.13:61616);// 创建链接对象connection connectionFactory.createConnection();// 启动链接connection.start();/** 创建会话对象* 方法- connection.createSession(boolean transacted,int acknowledgeMode);* transacted - 是否使用事务,可选值为true|false* true - 使用事务,当设置此变量值,则acknowledgeMode参数无效,建议传递的acknowledgeMode参数值为* Session.SESSION_TRANSACTED* false - 不使用事务,设置此变量值,则acknowledgeMode参数必须设置.* acknowledgeMode - 消息确认机制,可选值为:* Session.AUTO_ACKNOWLEDGE - 自动确认消息机制* Session.CLIENT_ACKNOWLEDGE - 客户端确认消息机制* Session.DUPS_OK_ACKNOWLEDGE - 有副本的客户端确认消息机制*/session connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 创建目的地,目的地命名即队列命名,消息消费者需要通过此命名访问对应的队列destination session.createQueue(test-mq);// 创建消息消费者,创建的消息消费者与某目的地对应,即方法参数目的地.consumer session.createConsumer(destination);// 从ActiveMQ服务中获取消息message consumer.receive();TextMessage tMsg (TextMessage) message;System.out.println(从MQ中获取的消息是:tMsg.getText());}catch(Exception e){e.printStackTrace();System.out.println(访问ActiveMQ服务发生错误!!);}finally{try {// 回收消息消费者资源if(null ! consumer)consumer.close();} catch (JMSException e) {e.printStackTrace();}try {// 回收会话资源if(null ! session)session.close();} catch (JMSException e) {e.printStackTrace();}try {// 回收链接资源if(null ! connection)connection.close();} catch (JMSException e) {e.printStackTrace();}}}}2.6.2 第二步修改测试类MessageTest新增测试方法Testpublic void recieveFromMQ(){MyConsumer consumer new MyConsumer();consumer.recieveFromMQ();}2.6.3 第三步测试--查看Eclipse控制台--查看ActiveMQ管理控制界面--消息被消费了测试成功!!!3 ActiveMQ监听器问题在前面的示例中我们发现消费者每次只能消费一条消息。当队列中有多条消息的时候我们需要多次运行消费者才能消费完这些消息。很麻烦如何解决这个问题呢我们希望一次将所有的消息全部接收。答使用ActiveMQ监听器来监听队列持续消费消息。3.1 配置步骤说明(1)创建一个监听器对象。(2)修改消费者代码加载监听器。3.2 配置步骤3.2.1 第一步创建监听器MyListener类--说明自定义监听器需要实现MessageListener接口package cn.gzsxt.mq.listener;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;public class MyListener implements MessageListener{Overridepublic void onMessage(Message message) {if(null!message){TextMessage tMsg (TextMessage) message;try {System.out.println(从MQ中获取的消息是:tMsg.getText());} catch (JMSException e) {e.printStackTrace();}}}}3.2.2 第二步修改MyConsumer代码加载监听器--说明监听器需要持续加载因此消费程序不能结束。这里我们使用输入流阻塞消费线程结束。(实际开发中使用web项目加载)package cn.gzsxt.mq.consumer;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;import cn.gzsxt.mq.listener.MyListener;/*** ClassName:MyConsumer* Description: 消息消费者代码*/public class MyConsumer {// 定义链接工厂ConnectionFactory connectionFactory null;// 定义链接Connection connection null;// 定义会话Session session null;// 定义目的地Destination destination null;// 定义消息消费者MessageConsumer consumer null;// 定义消息Message message null;public Message recieveFromMQ(){try{/** 创建链接工厂* ActiveMQConnectionFactory - 由ActiveMQ实现的ConnectionFactory接口实现类.* 构造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)* userName - 访问ActiveMQ服务的用户名,用户名可以通过jetty-realm.properties配置文件配置.* password - 访问ActiveMQ服务的密码,密码可以通过jetty-realm.properties配置文件配置.* brokerURL - 访问ActiveMQ服务的路径地址.路径结构为-协议名://主机地址:端口号* 此链接基于TCP/IP协议.*/connectionFactory new ActiveMQConnectionFactory(admin, admin, tcp://192.168.23.13:61616);// 创建链接对象connection connectionFactory.createConnection();// 启动链接connection.start();/** 创建会话对象* 方法- connection.createSession(boolean transacted,int acknowledgeMode);* transacted - 是否使用事务,可选值为true|false* true - 使用事务,当设置此变量值,则acknowledgeMode参数无效,建议传递的acknowledgeMode参数值为* Session.SESSION_TRANSACTED* false - 不使用事务,设置此变量值,则acknowledgeMode参数必须设置.* acknowledgeMode - 消息确认机制,可选值为:* Session.AUTO_ACKNOWLEDGE - 自动确认消息机制* Session.CLIENT_ACKNOWLEDGE - 客户端确认消息机制* Session.DUPS_OK_ACKNOWLEDGE - 有副本的客户端确认消息机制*/session connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 创建目的地,目的地命名即队列命名,消息消费者需要通过此命名访问对应的队列destination session.createQueue(test-mq);// 创建消息消费者,创建的消息消费者与某目的地对应,即方法参数目的地.consumer session.createConsumer(destination);// // 从ActiveMQ服务中获取消息// message consumer.receive();//// TextMessage tMsg (TextMessage) message;//// System.out.println(从MQ中获取的消息是:tMsg.getText());//加载监听器consumer.setMessageListener(newMyListener());//监听器需要持续加载这里我们使用输入流阻塞当前线程结束。System.in.read();}catch(Exception e){e.printStackTrace();System.out.println(访问ActiveMQ服务发生错误!!);}finally{try {// 回收消息消费者资源if(null ! consumer)consumer.close();} catch (JMSException e) {e.printStackTrace();}try {// 回收会话资源if(null ! session)session.close();} catch (JMSException e) {e.printStackTrace();}try {// 回收链接资源if(null ! connection)connection.close();} catch (JMSException e) {e.printStackTrace();}}return message;}}3.3 测试(1)多次运行生产者发送多条消息到队列中。(2)运行消费者。观察结果--查看Eclipse控制台一次消费了3条消息--查看ActiveMQ管理控制界面所有消息都被消费了--测试成功!!!4 ActiveMQ消息服务模式问题在入门示例中只能向一个消费者发送消息。但是有一些场景需求有多个消费者都能接收到消息比如美团APP每天的消息推送。该如何实现呢答ActiveMQ是通过不同的服务模式来解决这个问题的。所以要搞清楚这个问题必须知道ActiveMQ有哪些应用模式。4.1 PTP模式(point to point)--消息模型消息生产者生产消息发送到queue中然后消息消费者从queue中取出并且消费消息。消息被消费以后queue中不再有存储所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者但是对一个消息而言只会有一个消费者可以消费、其它的则不能消费此消息了。当消费者不存在时消息会一直保存直到有消费消费我们的入门示例就是采用的这种PTP服务模式。4.2 TOPIC(主题订阅模式)--消息模型消息生产者(发布)将消息发布到topic中同时有多个消息消费者(订阅)消费该消息。和点对点方式不同发布到topic的消息会被所有订阅者消费。当生产者发布消息不管是否有消费者。都不会保存消息所以主题订阅模式下一定要先有消息的消费者(订阅者)后有消息的生产者(发布者)。我们前面已经实现了PTP模式下面我们来实现TOPIC模式。5 Topic模式实现5.1 配置步骤说明发表于 2019-08-01 00:00阅读 ( 411 )