西安制作网站公司简介,如何做一张旅游网站,库存网站建设定制,南昌建站模板1.工作及使用背景 工作中需要跟收集各种硬件或传感器数据用于Web展示及统计计算分析#xff0c;如电表、流量计、泵、控制器等物联网设备。 目前的思路及解决策略是使用力控或者杰控等组态软件实现数据的转储#xff08;也会涉及收费问题#xff09;#xff0c;通过组态软件…1.工作及使用背景 工作中需要跟收集各种硬件或传感器数据用于Web展示及统计计算分析如电表、流量计、泵、控制器等物联网设备。 目前的思路及解决策略是使用力控或者杰控等组态软件实现数据的转储也会涉及收费问题通过组态软件自带的转储工具将数据转储到关系型数据库如MySQL、sqlLite、Postgresql等。然后在BS架构后台程序中通过定时刷数据或者查询时计算的方式进行统计分析计算。 但上述解决方案实际上是实现简单但是数据统计时机有潜在的偏差风险且逻辑设计非常别扭数据库压力大等问题理论上应该通过消息队列来接收实时数据参与计算的方式Web系统只负责展示计算统计之后的结果这样无论是时效还是数据准确性更容易保证实时数据存储的数据库压力也不存在可做数据校验用也可不用逻辑也不显别扭。 2.开发环境及工具
JDK1.8、maven、Mosquitto、IDEA、postman 3.框架结构及文件声明 因为我用的现成的框架所以启动模块和业务模块分开了。实际开发调试中完全可以放一起也没关系。
MqttClientConnectorPool对外提供一个初始化的Mqtt客户端在服务启动时初始化MqttMsgSender对外提供一个可以执行消息发送的方法MqttMsgSubscriber初始化一个Mqtt客户端并根据配置订阅topicTestController接收web请求的调用消息发送用于测试BusinessApplicationStartup服务启动时执行调用MqttClientConnectorPool初始化一个客户端并调起MqttMsgSubscriber的监听等待BusinessApplicationShutdown服务正常终止时调用关闭服务启动默认创建的Mqtt客户端MqttBrokerServerSpringBoot服务启动类 4.具体实现逻辑及代码
4.1 maven依赖
propertiesMQTTv3.version1.2.5/MQTTv3.version
/propertiesdependencyManagementdependenciesdependencygroupIdorg.eclipse.paho/groupIdartifactIdorg.eclipse.paho.client.MQTTv3/artifactIdversion${MQTTv3.version}/version/dependency/dependencies
/dependencyManagement或者直接使用
dependencygroupIdorg.eclipse.paho/groupIdartifactIdorg.eclipse.paho.client.MQTTv3/artifactIdversion1.2.5/version
/dependency
4.2 MqttClientConnectorPool
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;Slf4j
public class MqttClientConnectorPool {public static MqttClient mqttClient;/*** 连接MQTT客户端* return 获取MQTT连队对象*/public static MqttClient connectMQTT() {if (mqttClient ! null){log.info(已存在我深深的脑海);return mqttClient;}try {// broker及连接信息String broker tcp://127.0.0.1:1883;String username admin;String password 123456;String clientId System.currentTimeMillis() ;//创建MQTT客户端指定broker、客户端id、消息持久策略mqttClient new MqttClient(broker, clientId, new MemoryPersistence());//创建连接参数配置MqttConnectOptions options new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());//是否清除会话options.setCleanSession(true);//连接超时时间options.setKeepAliveInterval(20);//是否自动重连options.setAutomaticReconnect(true);mqttClient.connect(options);log.info(MqttClient 服务启动broker初始化);} catch (MqttException e){log.error(MqttClient connect Error:{}, e.getMessage());e.printStackTrace();}return mqttClient;}/*** 关闭MQTT客户端* param client client*/public static void closeClient(MqttClient client){try {// 断开连接client.disconnect();// 关闭客户端client.close();} catch (MqttException e){log.error(MqttClient disconnect or close Error{}, e.getMessage());e.printStackTrace();}}/*** 关闭MQTT客户端*/public static void closeStaticClient(){try {if (mqttClient ! null){// 断开连接mqttClient.disconnect();// 关闭客户端mqttClient.close();}} catch (MqttException e){log.error(MqttClient disconnect or close Error{}, e.getMessage());e.printStackTrace();}}
}
4.3 MqttMsgSender
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;Slf4j
public class MqttMsgSender {public void sendMessage(MqttClient client,String topic,String content,int qos){MqttMessage message new MqttMessage(content.getBytes());message.setQos(qos);try{client.publish(topic,message);} catch (MqttException e){log.error(MqttClient publish text info Error:{}!, e.getMessage());e.printStackTrace();}}
}4.4 MqttMsgSubscriber
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;Slf4j
public class MqttMsgSubscriber {String broker tcp://127.0.0.1:1883;String topic /deviceUp;String username admin;String password 123456;String clientId System.currentTimeMillis() ;int qos 1;public void readSubscribeTopicMessage(){try {MqttClient client new MqttClient(broker, clientId, new MemoryPersistence());// 连接参数MqttConnectOptions options new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());//是否清除会话options.setCleanSession(true);options.setConnectionTimeout(60);options.setKeepAliveInterval(60);client.setCallback(new MqttCallback() {Overridepublic void connectionLost(Throwable throwable) {log.error(连接丢失);}Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {log.info(topic为: topic);log.info(qos为: mqttMessage.getQos());log.info(消息内容为: new String(mqttMessage.getPayload()));}Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {// 当消息被完全传送出去后调用log.info(交付完成 ---Delivery complete!);// 可以在这里处理一些发送完成后的清理工作}});client.connect(options);client.subscribe(topic, qos);} catch (MqttException e){log.error(MqttMsgSubscriber 连接启动异常{}, e.getMessage());} catch (Exception e){log.error(MqttMsgSubscriber 读取消息异常{}, e.getMessage());}}}4.5 TestController
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.web.bind.annotation.*;import java.util.List;Slf4j
RestController
RequestMapping()
public class TestController {GetMapping(/test/mqtt/{msg})public String testSendMqttMsg(PathVariable(msg) String msg){log.info(消息内容{}., msg);MqttClient mqttClient MqttClientConnectorPool.connectMQTT();MqttMsgSender sender new MqttMsgSender();String content { \deviceNo\: \ msg \, \val\: 232.5 };String topic /deviceUp;int qos 1;if (null ! mqttClient){sender.sendMessage(mqttClient, topic, content, qos);} else {log.info(MqttClient为空无法发送);return 失败;}return 成功;}}4.6 BusinessApplicationStartup
import 包路径可以删掉这一行手动导入.MqttClientConnectorPool;
import 包路径可以删掉这一行手动导入.MqttMsgSubscriber;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;Slf4j
Order(10)
Component
public class BusinessApplicationStartup implements ApplicationRunner {Overridepublic void run(ApplicationArguments args) throws Exception {log.info(MqttClientConnectorPool Startup);MqttClientConnectorPool.connectMQTT();log.info(MqttClientConnectorPool recoveryAllJob Over !);log.info(MqttMsgSubscriber Startup);// 先订阅等待MqttMsgSubscriber subscriber new MqttMsgSubscriber();subscriber.readSubscribeTopicMessage();}
}4.7 BusinessApplicationShutdown
import 包路径可以删掉这一行手动导入.MqttClientConnectorPool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;Slf4j
Component
public class BusinessApplicationShutdown implements ApplicationListenerContextClosedEvent {Overridepublic void onApplicationEvent(ContextClosedEvent contextClosedEvent) {log.info(服务终止 shutdown hook, ContextClosedEvent);MqttClientConnectorPool.closeStaticClient();}}4.8 MqttBrokerServer
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;EnableScheduling
SpringBootApplication
public class MqttBrokerServer {public static void main(String[] args) {SpringApplication.run(MqttBrokerServer.class, args);}}5.其他备注
5.1 需要Mqtt(Broker)服务器 如果是直接使用示例代码的Mqtt服务器Broker配置需要在自己电脑上安装Mqtt服务器如mosquitto、EMQX等具体自行搜索或者使用公用的Mqtt服务器我没测试试过
// 注意当前Broker本人未测试
String broker tcp://broker.emqx.io:1883;
String topic mqtt/test;
String username emqx;
String password public;
5.2 调试地址
如果配置文件没配置[server.servlet.context-path]就不需要我自己/backend这一段 6.参考文章 MQTT协议介绍及Java教程 https://baijiahao.baidu.com/s?id1801542244354727565wfrspiderforpc 7.喜欢作者
暂无