手机免费建站平台下载,网页制作基础教程,重庆专业网站建设,福州网站建设思企一、MQTT介绍
1.1 什么是MQTT#xff1f;
MQTT#xff08;Message Queuing Telemetry Transport#xff0c;消息队列遥测传输协议#xff09;#xff0c;是一种基于发布/订阅#xff08;publish/subscribe#xff09;模式的“轻量级”通讯协议#xff0c;该协议构建于…一、MQTT介绍
1.1 什么是MQTT
MQTTMessage Queuing Telemetry Transport消息队列遥测传输协议是一种基于发布/订阅publish/subscribe模式的“轻量级”通讯协议该协议构建于TCP/IP协议上由IBM在1999年发布。
MQTT最大优点在于用极少的代码和有限的带宽为连接远程设备提供实时可靠的消息服务。 MQTT具有协议简洁、轻巧、可扩展性强、低开销、低带宽占用等优点已经有PHPJAVAPythonCC#Go等多个语言版本基本可以使用在任何平台上。在物联网、小型设备、移动应用等方面有较广泛的应用特别适合用来当做物联网的通信协议。 1.2 MQTT特点
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的这些特点使它适用范围非常广泛。在很多情况下包括受限的环境中如机器与机器M2M通信和物联网IoT。
MQTT协议是为硬件性能有限且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议它具有以下主要的几项特性
1.使用发布/订阅消息模式提供多对多的消息发布解除应用程序耦合2.对负载内容屏蔽的消息传输3.使用TCP/IP 提供网络连接4.支持三种消息发布服务质量QoS QoS 0最多一次消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这个级别可用于如下情况环境传感器数据丢失一次数据无所谓因为不久后还会有第二次发送。QoS 1至少一次确保消息到达但消息重复可能会发生。QoS 2只有一次确保消息到达一次。这个级别可用于如下情况在计费系统中消息重复或丢失会导致不正确的结果。
5.传输数据小开销很小固定长度的头部是 2 字节协议交换最小化以降低网络流量(用极少的代码和有限的带宽为连接远程设备提供实时可靠的消息服务。) 1.3 MQTT应用场景
MQTT作为一种低开销、低带宽占用的即时通讯协议使其在物联网、小型设备、移动应用等方面有着广泛的应用。MQTT服务只负责消息的接收和传递应用系统连接到MQTT服务器后可以实现采集数据接收、解析、业务处理、存储入库、数据展示等功能。常见的应用场景主要有以下几个方面
1消息推送 如PC端的推送公告比如安卓的推送服务还有一些即时通信软件如微信、易信等也是采用的推送技术。
2智能点餐 通过MQTT消息队列产品消费者可在餐桌上扫码点餐并与商家后端系统连接实现自助下单、支付。
3信息更新 实现商场超市等场所的电子标签、公共场所的多媒体屏幕的显示更新管理。
4扫码出站 最常见的停车场扫码缴费自动起竿地铁闸口扫码进出站。 二、MQTT的角色组成
2.1 MQTT的客户端和服务端
2.1.1 服务端Broker
EMQX就是一个MQTT的Brokeremqx只是基于erlang语言开发的软件而已其它的MQ还有ActiveMQ、RabbitMQ、HiveMQ等等。
EMQX服务端下载 EMQX 2.1.2 客户端(发布/订阅)
EMQX客户端MQTTX全功能 MQTT 客户端工具
这个是用来测试验证的客户端实际项目是通过代码来实现我们消息的生产者和消费者。 2.2 MQTT中的几个概念
相比RabbitMQ等消息队列MQTT要相对简单一些只有Broker、Topic、发布者、订阅者等几部分构成。接下来我们先简单整理下MQTT日常使用中最常见的几个概念
1.Topic主题MQTT消息的主要传播途径, 我们向主题发布消息, 订阅主题, 从主题中读取消息并进行.业务逻辑处理, 主题是消息的通道2.生产者MQTT消息的发送者, 他们向主题发送消息3.消费者MQTT消息的接收者, 他们订阅自己需要的主题, 并从中获取消息4.broker服务消息转发器, 消息是通过它来承载的, EMQX就是我们的broker, 在使用中我们不用关心它的具体实现
其实, MQTT的使用流程就是: 生产者给broker的某个topic发消息-broker通过topic进行消息的传递-订阅该主题的消费者拿到消息并进行相应的业务逻辑 三、EMQX的安装和使用
下面以Windows为例演示Windows下如何安装和使用EXQX。
step 1下载EMQ安装包配置EMQ环境
EMQX服务端下载 EMQX
step 2下载压缩包解压cmd进入bin文件夹 step 3启动EMQX服务
在命令行输入emqx start 启动服务打卡浏览器输入http://localhost:18083/ 进入登录页面。默认用户名密码 admin/public 。登录成功后会进入emqx的后台管理页面如下图所示 四、使用SpringBoot整合MQTT协议
前面介绍了MQTT协议以及如何安装和启动MQTT服务。接下来演示如何在SpringBoot项目中整合MQTT实现消息的订阅和发布。
4.1 创建工程
首先创建spring-boot-starter-mqtt父工程在父工程下分别创建消息的提供者spring-boot-starter-mqtt-provider 模块和消息的消费者spring-boot-starter-mqtt-consumer模块。 4.2 实现生产者
接下来修改生产者模块spring-boot-starter-mqtt-provider 相关的代码实现消息发布的功能模块。
4.2.1 导入依赖包
修改pom.xml 文件添加MQTT相关依赖具体示例代码如下所示 dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.integration/groupIdartifactIdspring-integration-mqtt/artifactIdversion5.3.2.RELEASE/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.4/version/dependency/dependencies 4.2.2 修改配置文件
修改application.yml配置文件增加MQTT相关配置。示例代码如下所示 spring:application:name: provider#MQTT配置信息mqtt:#MQTT服务地址端口号默认11883如果有多个用逗号隔开url: tcp://127.0.0.1:11883#用户名username: admin#密码password: public#客户端id(不能重复)client:id: provider-id#MQTT默认的消息推送主题实际可在调用接口是指定default:topic: topic
server:port: 8080 4.2.3 消息生产者客户端配置
创建MqttProviderConfig配置类读取application.yml中的相关配置并初始化创建MQTT的连接。示例代码如下所示 import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;Configuration
Slf4j
public class MqttProviderConfig {Value(${spring.mqtt.username})private String username;Value(${spring.mqtt.password})private String password;Value(${spring.mqtt.url})private String hostUrl;Value(${spring.mqtt.client.id})private String clientId;Value(${spring.mqtt.default.topic})private String defaultTopic;/*** 客户端对象*/private MqttClient client;/*** 在bean初始化后连接到服务器*/PostConstructpublic void init(){connect();}/*** 客户端连接服务端*/public void connect(){try{//创建MQTT客户端对象client new MqttClient(hostUrl,clientId,new MemoryPersistence());//连接设置MqttConnectOptions options new MqttConnectOptions();//是否清空session设置false表示服务器会保留客户端的连接记录订阅主题qos,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息//设置为true表示每次连接服务器都是以新的身份options.setCleanSession(true);//设置连接用户名options.setUserName(username);//设置连接密码options.setPassword(password.toCharArray());//设置超时时间单位为秒options.setConnectionTimeout(100);//设置心跳时间 单位为秒表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线options.setKeepAliveInterval(20);//设置遗嘱消息的话题若客户端和服务器之间的连接意外断开服务器将发布客户端的遗嘱信息options.setWill(willTopic,(clientId 与服务器断开连接).getBytes(),0,false);//设置回调client.setCallback(new MqttProviderCallBack());client.connect(options);} catch(MqttException e){e.printStackTrace();}}public void publish(int qos,boolean retained,String topic,String message){MqttMessage mqttMessage new MqttMessage();mqttMessage.setQos(qos);mqttMessage.setRetained(retained);mqttMessage.setPayload(message.getBytes());//主题的目的地用于发布/订阅信息MqttTopic mqttTopic client.getTopic(topic);//提供一种机制来跟踪消息的传递进度//用于在以非阻塞方式在后台运行执行发布是跟踪消息的传递进度MqttDeliveryToken token;try {//将指定消息发布到主题但不等待消息传递完成返回的token可用于跟踪消息的传递状态//一旦此方法干净地返回消息就已被客户端接受发布当连接可用将在后台完成消息传递。token mqttTopic.publish(mqttMessage);token.waitForCompletion();} catch (MqttException e) {e.printStackTrace();}}
} 4.2.4 生产者客户端消息回调
创建MqttProviderCallBack类并继承MqttCallback实现相关消息回调事件示例代码如下图所示 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;public class MqttConsumerCallBack implements MqttCallback{/*** 客户端断开连接的回调*/Overridepublic void connectionLost(Throwable throwable) {System.out.println(与服务器断开连接可重连);}/*** 消息到达的回调*/Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println(String.format(接收消息主题 : %s,topic));System.out.println(String.format(接收消息Qos : %d,message.getQos()));System.out.println(String.format(接收消息内容 : %s,new String(message.getPayload())));System.out.println(String.format(接收消息retained : %b,message.isRetained()));}/*** 消息发布成功的回调*/Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println(String.format(接收消息成功));}
} 4.2.5 创建Controller控制器实现消息发布功能
创建SendController控制器类实现消息的发送功能示例代码如下所示 Controller
public class SendController {Autowiredprivate MqttProviderConfig providerClient;RequestMapping(/sendMessage)ResponseBodypublic String sendMessage(int qos,boolean retained,String topic,String message){try {providerClient.publish(qos, retained, topic, message);return 发送成功;} catch (Exception e) {e.printStackTrace();return 发送失败;}}
} 4.3 实现消费者
前面完成了生成者消息发布的模块接下来修改消费者模块spring-boot-starter-mqtt-consumer实现消息订阅、处理的功能。
4.3.1 导入依赖包
修改pom.xml 文件添加MQTT相关依赖具体示例代码如下所示 dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.integration/groupIdartifactIdspring-integration-mqtt/artifactIdversion5.3.2.RELEASE/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.4/version/dependency/dependencies 4.3.2 修改配置文件
修改application.yml配置文件增加MQTT相关配置。示例代码如下所示 spring:application:name: consumer#MQTT配置信息mqtt:#MQTT服务端地址端口默认为11883如果有多个用逗号隔开url: tcp://127.0.0.1:11883#用户名username: admin#密码password: public#客户端id不能重复client:id: consumer-id#MQTT默认的消息推送主题实际可在调用接口时指定default:topic: topic
server:port: 8085 4.3.3 消费者客户端配置
创建消费者客户端配置类MqttConsumerConfig读取application.yml中的相关配置并初始化创建MQTT的连接。示例代码如下所示 import javax.annotation.PostConstruct;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;Configuration
public class MqttConsumerConfig {Value(${spring.mqtt.username})private String username;Value(${spring.mqtt.password})private String password;Value(${spring.mqtt.url})private String hostUrl;Value(${spring.mqtt.client.id})private String clientId;Value(${spring.mqtt.default.topic})private String defaultTopic;/*** 客户端对象*/private MqttClient client;/*** 在bean初始化后连接到服务器*/PostConstructpublic void init(){connect();}/*** 客户端连接服务端*/public void connect(){try {//创建MQTT客户端对象client new MqttClient(hostUrl,clientId,new MemoryPersistence());//连接设置MqttConnectOptions options new MqttConnectOptions();//是否清空session设置为false表示服务器会保留客户端的连接记录客户端重连之后能获取到服务器在客户端断开连接期间推送的消息//设置为true表示每次连接到服务端都是以新的身份options.setCleanSession(true);//设置连接用户名options.setUserName(username);//设置连接密码options.setPassword(password.toCharArray());//设置超时时间单位为秒options.setConnectionTimeout(100);//设置心跳时间 单位为秒表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线options.setKeepAliveInterval(20);//设置遗嘱消息的话题若客户端和服务器之间的连接意外断开服务器将发布客户端的遗嘱信息options.setWill(willTopic,(clientId 与服务器断开连接).getBytes(),0,false);//设置回调client.setCallback(new MqttConsumerCallBack());client.connect(options);//订阅主题//消息等级和主题数组一一对应服务端将按照指定等级给订阅了主题的客户端推送消息int[] qos {1,1};//主题String[] topics {topic1,topic2};//订阅主题client.subscribe(topics,qos);} catch (MqttException e) {e.printStackTrace();}}/*** 断开连接*/public void disConnect(){try {client.disconnect();} catch (MqttException e) {e.printStackTrace();}}/*** 订阅主题*/public void subscribe(String topic,int qos){try {client.subscribe(topic,qos);} catch (MqttException e) {e.printStackTrace();}}
} 4.3.4 消费者客户端消息回调
创建MqttConsumerCallBack类并继承MqttCallback实现相关消息回调事件示例代码如下图所示 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;public class MqttConsumerCallBack implements MqttCallback{/*** 客户端断开连接的回调*/Overridepublic void connectionLost(Throwable throwable) {System.out.println(与服务器断开连接可重连);}/*** 消息到达的回调*/Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println(String.format(接收消息主题 : %s,topic));System.out.println(String.format(接收消息Qos : %d,message.getQos()));System.out.println(String.format(接收消息内容 : %s,new String(message.getPayload())));System.out.println(String.format(接收消息retained : %b,message.isRetained()));}/*** 消息发布成功的回调*/Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println(String.format(接收消息成功));}
} 4.3.5 创建Controller控制器实现MQTT连接的建立和断开
接下来创建Controller控制器MqttController并实现MQTT连接的建立和断开等方法。示例代码如下所示 import com.weiz.mqtt.config.MqttConsumerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;Controller
public class MqttController {Autowiredprivate MqttConsumerConfig client;Value(${spring.mqtt.client.id})private String clientId;RequestMapping(/connect)ResponseBodypublic String connect(){client.connect();return clientId 连接到服务器;}RequestMapping(/disConnect)ResponseBodypublic String disConnect(){client.disConnect();return clientId 与服务器断开连接;}
} 4.4 测试验证
首先分别启动生产者spring-boot-starter-mqtt-provider 和消费者spring-boot-starter-mqtt-consumer两个项目打开浏览器输入地址http://localhost:18083/在EMQX管理界面可以看到连接上来的两个客户端。如下图所示 接下来调用生产者的消息发布接口验证消息发布是否成功。使用Pomstman调用消息发送接口http://localhost:8080/sendMessage 如下图所示 通过上图可以发现生产者模块已经把消息发送成功。接下来查看消费者模块验证消息是否处理成功。如下图所示 通过日志输出可以发现消费者已经成功接收到生产者发送的消息说明我们成功实现在Spring Boot项目中整合MQTT实现了消息的发布和订阅的功能。