哈尔滨信息工程学院地址,网站排名优化培训哪家好,站长之家域名ip查询,劲松做网站的公司FluxMQ#xff1a;新一代的高性能MQTT代理服务器
前言
FLuxMQ是一款基于java开发#xff0c;支持无限设备连接的云原生分布式物联网接入平台。FluxMQ基于Netty开发#xff0c;底层采用Reactor3反应堆模型#xff0c;具备低延迟#xff0c;高吞吐量#xff0c;千万、亿级…FluxMQ新一代的高性能MQTT代理服务器
前言
FLuxMQ是一款基于java开发支持无限设备连接的云原生分布式物联网接入平台。FluxMQ基于Netty开发底层采用Reactor3反应堆模型具备低延迟高吞吐量千万、亿级别设备连接方便企业快速构建其物联网平台与应用。
FluxMQ官网https://www.fluxmq.com
FluxMQ演示系统http://demo.fluxmq.com/
丰富的功能 通配符
通配符解释#可以匹配零个或多个层级的主题。只能出现在主题末尾。可以匹配一个层级的主题。可以出现在主题的任何位置。 FluxMQ支持两种通配符和#。表示匹配一个层级#表示匹配零个或多个层级。下面是一些例子 井号 #
井号通配符表示可以匹配零个或多个层级的主题。它只能出现在主题的末尾例如 home/living_room/#。 用法示例如果一个订阅者订阅了主题 home/living_room/#那么它将接收到诸如 home/living_room/light、home/living_room/temperature 等所有以 home/living_room/ 开头的消息。
加号
加号通配符表示可以匹配一个层级的主题。它可以出现在主题的任何位置例如 home//light。 用法示例如果一个订阅者订阅了主题 home//light那么它将接收到诸如 home/living_room/light、home/kitchen/light 等层级为 home/ 下且以 /light 结尾的消息。
共享订阅
FluxMQ支持共享订阅单个topic共享订阅集群支持10W/s吞吐量下面吗我们介绍下共享订阅的使用方式
如何使用共享订阅
通过Topic的订阅规则可以指定订阅组名称从而实现共享订阅。例如订阅规则为$share/topic1则订阅组名称为DEFAULT,订阅主题为topic1。分组共享订阅是指将订阅组的消息分发均匀地分发给订阅组成员。在分组共享订阅中订阅同一主题的客户机依次接收此主题下的消息。同一消息不会发送给多个订阅客户端从而实现多个订阅客户端之间的负载均衡。订阅的规则为$queue/group1/topic1则订阅组名称为group1,订阅主题为topic1。
规则引擎
FluxMQ Rule Engine (以下简称规则引擎) 用于配置FluxMQ 消息流与设备事件的处理、响应规则。规则引擎不仅提供了清晰、灵活的 “配置式” 的业务集成方案简化了业务开发流程提升用户易用性降低业务系统与 FluxMQ 的耦合度也为 FluxMQ 的私有功能定制提供了一个更优秀的基础架构。 FluxMQ 在 消息发布或事件触发 时将触发规则引擎满足触发条件的规则将执行各自的 SQL 语句筛选并处理消息和事件的上下文信息 Sql支持
规则的 SQL 语句基本格式为:
SELECT 字段名 FROM 事件类型 [WHERE 条件]FROM 子句将规则挂载到某个事件类型上可多选SELECT 子句用于对数据进行变换并选择出需要的字段WHERE 子句用于对 SELECT 选择出来的某个字段施加条件过滤
## SELECT 语句用于决定最终的输出结果里的字段。比如:
## 下面 SQL 的输出结果中将只有两个字段 a 和 b:SELECT a, b FROM $EVENT.PUBLISH# 选取 username 为 abc 的建立连接消息输出结果为所有可用字段:SELECT * FROM $EVENT.CONNECT WHERE auth.username abc## 选取 clientId 为 abc 的终端发来的消息输出结果将只有 cid 一个字段。SELECT clientId as cid FROM $EVENT.PUBLISH WHERE clientId abc## 选取 qos 为 1 的发布消息输出结果将只有 cid 一个字段。
## 注意虽然 SELECT 语句中只选取了 cid 一个字段所有消息发布事件中的可用字段 (比如 clientId、topic 等) 仍然可以在 WHERE 语句中使用:SELECT clientId as cid FROM $EVENT.PUBLISH WHERE qos 1## 但下面这个 SQL 语句就不能工作了因为变量 xyz 不是消息发布事件中的可用字段:SELECT clientId as cid FROM $EVENT.PUBLISH WHERE xyz abc比较符号
函数名函数作用返回值大于true/false小于true/false小于等于true/false大于等于true/false不等于true/false!不等于true/false比较两者是否完全相等。可用于比较变量和主题true/false~比较主题(topic)是否能够匹配到主题过滤器(topic filter)。只能用于主题匹配true/false
SQL 语句示例
基本语法举例
从 topic 为 “t/a” 的消息中提取所有字段:
SELECT * FROM $EVENT.PUBLISH WHERE topic t/a从 topic 为 “t/a” 或 “t/b” 的消息中提取所有字段:
SELECT * FROM $EVENT.PUBLISH WHERE topic t/a or topic t/b从 topic 能够匹配到 ‘t/#’ 的消息中提取所有字段。
SELECT * FROM $EVENT.PUBLISH WHERE topic ~ t/#从 topic 能够匹配到 ‘t/#’ 的消息中提取 qos、messageId 和 clientId 字段:
SELECT qos, messageId, clientId FROM $EVENT.PUBLISH WHERE topic ~ t/#从 建立连接 消息中提取 username 字段并且筛选条件为 username ‘fluxmq’:
select auth.username as username from $EVENT.CONNECT where auth.username fluxmq从任意 topic 的 JSON 消息体(payload) 中 提取 x 字段并创建别名 x 以便在 WHERE 子句中使用。WHERE 子句限定条件为 x 1。下面这个 SQL 语句可以匹配到消息体 {“x”: 1}但不能匹配到消息体 {“x”: 2}:
SELECT payload.x as x FROM $EVENT.PUBLISH WHERE payload.x 1类似于上面的 SQL 语句但 **嵌套地提取 **消息体中的数据下面的 SQL 语句可以匹配到 JSON 消息体 {“x”: {“y”: 1}}:
SELECT payload FROM $EVENT.PUBLISH WHERE payload.x.y 1在 clientId ‘c1’ 连接成功时提取其来源 IP 地址:
SELECT clientIp FROM $EVENT.CONNECT WHERE clientId c1筛选所有订阅 ‘t/#’ 主题 且 订阅级别为 QoS 1 的 clientId:
SELECT clientId FROM $EVENT.SUBSCRIBE WHERE topic ~ t/# and qos 1筛选所有订阅主题能匹配到 ‘t/#’ 且订阅级别为 QoS 1 的 clientId。注意与上例不同的是这里用的是主题匹配操作符 ‘~’所以会匹配订阅 ‘t’ 或 ‘t//a’ 的订阅事件:
SELECT clientId FROM $EVENT.SUBSCRIBE WHERE topic ~ t/# and qos 1从topic包含fluxmq字符的消息中提取所有字段使用 ‘like’ 语法包含的字符用%包裹
SELECT * FROM $EVENT.PUBLISH WHERE topic like %fluxmq%使用常量字段用双引号包裹字符串作为常量值as后面跟字段作为输出的字段名
SELECT *, test as event FROM $EVENT.PUBLISH可以直接使用Java的String API如 startsWithendsWith获取以’test’开头的所有Publish消息
select * from $EVENT.PUBLISH where topic.startsWith(test)提示
FROM 子句后面的主题需要用双引号 “”或者单引号 ‘’ 引起来。WHERE 子句后面接筛选条件如果使用到字符串需要用单引号 ‘’ 引起来。FROM 子句里如有多个事件需要用逗号 “,” 分隔。例如 SELECT * FROM “t/1”, “t/2” 。可以使用使用 “.” 符号对json字段进行嵌套选择。尽量不要给 payload 创建别名否则会影响运行性能。即尽量不要这么写SELECT payload as p
SQL函数列表
函数名说明json- 使用在SQL语句中可以将嵌套的Json结构转换成字符串- 使用在支持模板替换的想象中比如数据库保存的SQLKAFKA的TOPIC等地方bytes将对象转成字节数组会先将对象JSON化再转成UTF8格式的字节数组int8将对象转为Int8类型相当于byteint16将对象转为Int16类型相当于shortint32将对象转为Int32类型相当于intint64将对象转为Int64类型相当于longtoDouble将对象转为double类型hexStr将对象转成HEX字符串 会先将对象JSON化再转成对象的16进制字符串date格式化时间字符串yyyy-MM-dddatetime格式化时间字符串yyyy-MM-dd HH:mm:ssdateToTimestampyyyy-MM-dd 时间字符串转成时间戳datetimeToTimestampyyyy-MM-dd HH:mm:ss 时间字符串转成时间戳uuid生成32位随机小写字符串uuidUpper生成32位随机大写字符串isBytes判断是否是字节数组isJson判断是否是json
SQL事件
规则的 SQL 语句可以处理事件(发布消息、客户端上下线、客户端订阅等)FROM 子句后面跟事件主题。事件主题以 $EVENT/ 开头比如 E V E N T . P U B L I S H , EVENT.PUBLISH, EVENT.PUBLISH,EVENT.SUBSCRIBE 。
事件名称事件主题名释义发布事件$EVENT.PUBLISH发布消息订阅事件$EVENT.SUBSCRIBE订阅成功消息取消订阅事件$EVENT.UNSUBSCRIBE取消订阅成功消息发布回复事件$EVENT.ACK消息接收成功并回复心跳事件$EVENT.PING连接保活心跳消息取消连接事件$EVENT.DISCONNECT客户端主动断开连接连接断开事件$EVENT.CLOSE服务端关闭连接建立连接事件$EVENT.CONNECT连接成功离线消息事件$EVENT.OFFLINE离线期间接收的消息
支持的数据源
[✔] MySQL[✔] PostgreSQL[✔] Oracle[✔] SQL Server[✔] Redis[✔] TDengine[✔] ClickHourse[✔] Kafka[✔] RabbitMQ[✔] RocketMQ[✔] Pulsar[✔] MQTT[✔] Mogodb[✔] Log
多协议
目前已经支持的协议如下
[✔] Websocket[✔] Ocpp[✔] GBT32960[✔] JT808[✔] V2C[✔] I1[✔] Coap
通过控制台启动协议插件后默认会在集群每个节点气动对应的协议插件,依托于FluxMQ,该协议天然支持集群订阅的规则如下 下行指令通过MQTT下发给具体的设备,TOPIC规则为{协议名称}/down/{vin} 上行是设备上报的报文,TOPIC规则为{协议名称}/up/{vin} 脚本 脚本的处理时机是在MQTT事件处理器之前因此脚本的适用于修改、解析、加解密MQTT传输的Payload。脚本默认匹配主题支持通配符处理Payload可以直接返回byte[]或者String。 Javascript脚本 function convert(topic, payload){return payload;}Lua脚本
function convert(topic, payload)return payload
endGroovy脚本
def convert(String topic, byte[] payload){return payload
}FluxMQ免费推广 FluxMQ默认提供免费的接入License许可直接参考我们文档下载即可。