03 - 尚硅谷 - MQTT Dashboard
0.视频地址
1.Dashboard简介
EMQX 提供了一个内置的 管理控制台
,即 EMQX Dashboard。方便用户通过 Web 页面就能轻松管理和监控 EMQX 集群,并配置和使用所需的各项功能。
访问地址:http://ip:18083
首次登录访问账号:admin/public
重置密码:
./bin/emqx ctl admins passwd <Username> <Password>
2.访问控制
2.1 认证
就是验证客户端的身份。
创建认证器
创建认证器大致步骤:
1、选择认证方式
2、配置数据源
3、配置数据源相关参数
目前 EMQX 提供了三种认证方式,包含有:
-
Password-Based,使用客户端 ID 或用户名加密码的认证方式;
-
JWT,客户端可以在用户名或密码字段中携带 JWT token 来进行认证;
-
SCRAM,MQTT 5.0 中的增强认证,可以实现对客户端和服务器的双向认证。
Password-Based
当选择 Password-Based
的认证方式后,用户可以选择存储认证数据的数据库或提供认证数据功能的 HTTP 服务器,数据库包含两类:
-
EMQX 的内置数据库,即选择
Built-in Database
; -
外部数据库,支持选择并连接到一些主流数据库,包括:
MySQL
、PostgreSQL
、MongoDB
、Redis
等。
除数据库外,还可以直接使用能够提供认证数据的 HTTP 服务,即选择 HTTP Server
。
JWT
如果选择了 JWT 的认证方式,将无需选择数据源。
SCRAM
MQTT 5.0 中的增强认证功能,如果选择了该认证方式的话,目前仅提供了使用 Built-in Database
数据源。
配置参数说明
内置数据库
例如使用内置数据库的话需要选择使用用户名还是客户端 ID,选择密码的加密方式等;如果是选择了 MQTT 5.0 的增强认证,使用内置数据库的话,只需要配置加密方式即可。
外部数据库
选择外部数据库的话,需要配置能访问到的数据库地址,数据库名称,用户名密码,以及认证相关配置,填写如何从数据库中获取数据的 SQL 语句或其它查询语句等。以 MySQL 为例:
数据库环境准备:
# 创建数据库
docker run -d \
-p 3306:3306 \
-v mysql8_conf:/etc/mysql/conf.d \
-v mysql8_data:/var/lib/mysql \
-e MYSQL_ROOT_PASSWORD=1234 \
--name mysql8 \
--restart=always \
--privileged=true \
mysql:8.0.30
# mqtt_user.sql脚本
/*
Navicat Premium Data Transfer
Source Server : 147-3306
Source Server Type : MySQL
Source Server Version : 80029 (8.0.29)
Source Host : 192.168.136.147:3306
Source Schema : mqtt_user
Target Server Type : MySQL
Target Server Version : 80029 (8.0.29)
File Encoding : 65001
Date: 13/10/2024 08:43:32
*/
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for mqtt_user
-- ----------------------------
DROP TABLE IF EXISTS `mqtt_user`;
CREATE TABLE `mqtt_user` (
`id` bigint NOT NULL AUTO_INCREMENT,
`password_hash` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`salt` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`username` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
HTTP Server
选择使用 HTTP 服务的话,需要配置请求该 HTTP 服务的请求方式,POST 或 GET 方法。请求地址 URL,注意 URL 内需要填写协议是 http 或 https,如果有端口号的话需要在 URL 中携带端口号。然后是 HTTP 请求的 Headers 配置,携带认证信息的内容需要和请求 HTTP 服务的数据格式相同,然后配置到 Body
字段中,例如将 username
和 password
填写到 JSON 数据中。
认证列表
创建认证器成功后,可以在认证列表中查看和管理。认证列表的每一栏都可以通过鼠标来拖动调整顺序
,或通过操作栏调整列表顺序,顺序对于认证列表来说有一定的重要性,因为EMQX允许创建多个认证器,这些认证器将按照在认证链中的位置顺序运行
,如果在当前认证器中未检索到匹配的认证信息,将会切换至链上的下一个认证器继续认证过程。
认证链的认证流程
-
当前认证器执行时检索到了匹配的认证信息,例如用户名一致:
1.1 密码完全匹配,则客户端认证通过,允许连接。
1.2 密码无法匹配,则客户端认证失败,拒绝连接。
-
当前认证器执行时没有检索到匹配的认证信息,例如数据源中没有查找到数据:
2.1 当前认证器之后还有认证器:忽略认证,交由下一认证器继续认证。
2.2 当前认证器已经是链中最后一个认证器:客户端认证失败,拒绝连接。
用户管理
对于使用`内置数据库`的用户来说,在认证列表页点击 `用户管理`,可以来到用户管理页面,在该页面,可以管理认证信息,例如添加或删除用户名和密码,也可以通过下载模版,在模版内填充好相关的认证信息,点击 `导入` 来批量创建认证相关的用户信息。
客户端连接
当在MQTT的服务端创建完认证器以后,那么此时客户端在进行连接的时候就需要出示认证信息,如果未出示认证信息,那么此时就会报错Error: Connection refused: Bad User Name or Password
2.2 授权
授权简介
通常情况下,认证只是验证了客户端的身份是否合法,而该客户端是否具备发布、订阅某些主题的权限,还需要由授权系统
来判断。在 EMQX 中,授权是指对 MQTT 客户端的发布和订阅操作进行权限控制。
授权检查器链:
1、EMQX支持多种授权检查机制:基于ACL文件(默认配置)、基于内置数据库、基于MySQL进行授权、基于 MongoDB 进行授权 、基于 PostgreSQL 进行授权 、基于Redis进行授权、基于HTTP应用进行授权 ...
2、EMQX 支持用户通过配置多个授权检查器构成授权链
,以实现更灵活的授权检查。EMQX 将按照授权链中配置的检查器顺序依次执行授权检查。如果在当前授权检查器中未检索到权限数据,将会切换至链上的下一个启用的授权检查器继续权限检查。
ACL文件授权演示
1、ACL授权说明
EMQX 支持基于 ACL 文件中存储的规则进行授权检查。您可在文件中配置多条授权检查规则,在收到客户端的操作请求后,EMQX 会按照从上到下的顺序进行授权规则匹配;在成功匹配到某条规则后,EMQX 将按设定允许或拒绝当前请求,并停止后续规则的匹配。
2、文件格式介绍
基本语法和概念如下:
-
一组授权规则一个花括号包起来,花括号内的各个元素用逗号分隔
-
每条规则应以
.
结尾 -
注释行以
%%
开头,在解析过程中会被丢弃
代码示例:
%% 允许用户名是 dashboard 的客户端订阅 "$SYS/#" 这个主题
{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
%% 允许来自127.0.0.1 的用户发布和订阅 "$SYS/#" 以及 "#"
{allow, {ipaddr, "127.0.0.1"}, all, ["$SYS/#", "#"]}.
%% 拒绝其他所有用户订阅 `$SYS/#`,`#` 和 `+/#` 主题
{deny, all, subscribe, ["$SYS/#", {eq, "#"}, {eq, "+/#"}]}.
%% 如果前面的规则都没有匹配到,则允许所有操作
%% 注意:在生产环境中,最后一条规则应该设置为 `{deny, all}`,并且配置 `authorization.no_match = deny`
{allow, all}.
在每一组授权规则中:
1、第一个元素表示该条规则对应的权限;可选值:
- allow(允许)
- deny(拒绝)
2、第二个元素用来指定适用此条规则的客户端,比如:
- {username, "dashboard"}:用户名为 dashboard 的客户端;也可写作{user, "dashboard"}
- {username, {re, "^dash"}}:用户名匹配 正则表达式 ^dash 的客户端
- {clientid, "dashboard"}:客户端 ID 为 dashboard 的客户端,也可写作{client, "dashboard"}
- {clientid, {re, "^dash"}}:客户端 ID 匹配 正则表达式 ^dash 的客户端
- {ipaddr, "127.0.0.1"}:源地址为 127.0.0.1 的客户端;支持 CIDR 地址格式。注意:如果 EMQX 部署在负载均衡器后侧,建议为 EMQX 的监听器开启 proxy_protocol 配置 ,否则 EMQX 可能会使用负载均衡器的源地址。
- {ipaddrs, ["127.0.0.1", ..., ]}:来自多个源地址的客户端,不同 IP 地址之间以 `,` 区分
- all:匹配所有客户端
- {'and', [Spec1, Spec2, ...]} :满足列表中所有规范的客户端。
- {'or', [Spec1, Spec2, ...]} :满足列表中任何规范的客户端。
3、第三个元素用来指定该条规则对应的操作:
- publish:发布消息
- subscribe:订阅主题
- all:发布消息和订阅主题
- 从 v5.1.1 版本开始,EMQX 支持检查发布与订阅操作中的 QoS 与保留消息标志位,您可以在第三个元素中加上 qos 或者 retain 来指定检查的 QoS 或保留消息
标志位,例如:
- {publish, [{qos, 1}, {retain, false}]}:拒绝发布 QoS 为 1 的保留消息
- {publish, {retain, true}}:拒绝发布保留消息
- {subscribe, {qos, 2}}:拒绝以 QoS2 订阅主题
4、第四个元素用于指定当前规则适用的 MQTT 主题,支持通配符(主题过滤器),可以使用`主题占位符`:
- "t/${clientid}":使用了主题占位符,当客户端 ID 为 emqx_c 的客户端触发检查时,将精确匹配 t/emqx_c 主题
- "$SYS/#":通过通配符匹配 $SYS/ 开头的所有主题,如 $SYS/foo、 $SYS/foo/bar
- {eq, "foo/#"}:精确匹配 foo/# 主题,主题 foo/bar 将无法匹配,此处 `eq` 表示全等比较(equal)
另外还有 2 种特殊的规则,通常会用在 ACL 文件的末尾作为默认规则使用。
- {allow, all}:允许所有请求
- {deny, all}:拒绝所有请求
3、配置演示
在Dashboard的中权限配置文件中添加如下的配置:
# 拒绝任意的客户端订阅test/#这种规则的主题
{deny, all, subscribe, ["test/#"]}.
基于内置数据库授权演示
通过 Dashboard 配置:
1、在 EMQX Dashboard 页面,点击左侧导航栏的 访问控制 -> 授权,在 授权 页面,添加 Built-in Database作为 数据源, 点击下一步 进入 配置参数 页签。由于无需配置其他参数,可直接点击创建完成配置。
2、通过 Dashboard 配置:在 Dashboard 的 授权页面,点击 Built-in Database 数据源对应的 操作 栏下的 权限管理,即可进行授权检查规则的配置。您可根据需要从客户端 ID、用户名或直接从主题角度设置授权检查。
- 客户端 ID:见 客户端 ID 页签,指定适用此条规则的客户端
- 用户名:见 用户名 页签,指定适用此条规则的用户名
- 权限:是否允许当前客户端/用户的某类操作请求;可选值:允许、拒绝
- 操作:配置该条规则对应的操作;可选值:发布、订阅、发布与订阅
- 主题:配置该条规则对应的主题
EMQX 支持针对单个客户端或用户配置多条授权检查规则,您可通过页面的 上移、下移 调整不同规则的执行顺序和优先级。
注意:可以通过主题订阅来验证消息是否发送成功。
3.黑名单与连接抖动检测
3.1 黑名单
EMQX 为用户提供了黑名单功能来禁止
某些客户端的访问,除了可以封禁客户端 ID
以外,还支持直接封禁用户名甚至 IP
地址。
封禁还可以通过规则来实现,这些规则包括:
1、使用正则表达式匹配客户端标识符和用户名。
2、使用 CIDR 范围匹配源 IP 地址。
创建黑名单:
1. 在 EMQX Dashboard 页面,点击左侧导航栏的访问控制 -> 黑名单,在随即打开的页面,单击创建。
2. 在弹出的创建页面,按照页面提示配置:
2.1 禁用对象:通过下拉列表选择封禁客户端的方式,可以指定客户端 ID、用户名、IP 地址、客户端 ID 模式、用户名模式或 IP 地址范围,然后提供相应的值。
2.2 到期时间(可选):设置该规则的到期时间。
2.3 原因(可选):说明为该对象设置黑名单的原因。
3. 点击创建完成配置。
3.2 连接抖动检测
EMQX 支持自动封禁那些被检测到短时间内频繁登录的客户端,并且在一段时间内拒绝这些客户端的登录,以避免此类客户端过多占用服务器资源而影响其他客户端的正常使用。
需要注意的是,连接抖动检测功能只会封禁客户端 ID,并不封禁用户名和 IP 地址
,即该机器只要更换客户端标 ID 就能够继续登录。
抖动检测功能默认关闭,您可以通过 EMQX Dashboard 或配置文件开启该功能。
通过 Dashboard 开启:
前往 Dashboard,从左侧导航菜单点击访问控制 -> 连接抖动 进入连接抖动页面。通过点击切换按钮启用抖动检测功能。
1、检测时间窗口:您可以指定系统监视客户端抖动行为的持续时间。默认值为 1 分钟。
2、最大断连次数:您可以指定在检测窗口时间内允许的 MQTT 客户端的最大断开连接次数。它允许您设定准确的标准来识别和响应表现出抖动行为的客户端。默认值为
15。
3、封禁时长:您可以指定客户端被封禁的时间长度。默认值为 5 分钟。
点击保存更改以完成设置。
4.数据集成
4.1 数据集成概述
思考问题:如何将一个物联网设备产生的数据传输到业务系统中?
上述方案的弊端:较为麻烦
数据集成:为 EMQX 引入了与外部数据系统的连接,从而以实现设备与其他业务系统的无缝集成。
EMQX的数据集成功能不单单可以快速的将物联网设备产生的数据传递到业务系统中,也可以和其他的外部数据系统进行集成,实现数据的快速传输。
4.2 工作原理介绍
sink和source组件:
数据集成使用 Sink 与 Source 组件与外部数据系统对接。
1、Sink(出) 用于将消息发送到外部数据系统,例如 MySQL、Kafka 或 HTTP 服务等。
2、Source(入) 则用于从外部数据系统接收消息,例如 MQTT、Kafka 或 GCP PubSub。
连接器:
连接器负责与外部数据系统的连接,用户可以为不同的外部数据系统创建不同的连接器,一个连接器可以为多个 Sink/Source 提供连接。
规则引擎:
规则引擎是 EMQX 内置基于 SQL 的数据处理组件,搭配数据集成无需编写代码即可实现一站式的 IoT 数据提取、过滤、转换、存储与处理,以加速应用集成和业务创新。
规则的组成:规则描述了 数据来源、数据处理过程、处理结果去向 三个方面:
1、数据来源:规则的数据源可以是消息或事件,也可以是外部的数据系统 (source)。规则通过 SQL 的 FROM 子句指定数据的来源;
2、数据处理过程:规则通过 SQL 语句和函数来描述数据的处理过程。SQL 的 WHERE 子句用于过滤数据,SELECT 子句以及 SQL 函数用于提取和转换数据;
3、处理结果去向:规则可以定义一个或多个动作来处理 SQL 的输出结果。如果 SQL 执行通过,规则将按顺序执行相应的动作,比如将处理结果存储到数据库、或者重新发布到另一个 MQTT 主题等。支持的动作如下:
- 消息重发布:将结果发布到指定 MQTT 主题
- 控制台输出:将结果输出到控制台或日志中
- 发送到各类 Sink:将结果发送到外部数据系统中,如 MQTT 服务,Kafka,PostgreSQL 等
4.3 数据集成入门
需求:将客户端发往 't/a' 主题中的消息输出到EMQX的控制台
具体步骤:
1、进入到Dashboard中,依次点击"集成" ----> "规则" ----> "创建" 进入到创建规则的表单页面
2、配置好规则source和sink组件以后,可以点击对规则进行调试
3、启动MQTTX客户端和服务端建立连接,并且向t/a
主题发布消息,查看EMQX控制台的日志输出
4.4 连接器使用
案例一
需求:将客户端发往't/b'主题中的消息输出到EMQX的控制台和Redis中
# Docker部署Redis
docker run -d -p 6379:6379 --restart=always \
-v redis_config:/etc/redis/config \
-v redis_data:/data \
--name redis redis:7.0.10 \
redis-server /etc/redis/config/redis.conf
#redis启动的使用 redis-server 自定义加载一个配置文件
# 在redis_config数据卷所对应的磁盘目录下创建一个redis.conf文件,文件的内容如下所示
appendonly yes # 开启持久化, redis的持久化方式:rdb、aof
port 6379# 配置Redis进程对应的端口号为6379
requirepass 1234# 配置Redis访问密码
bind 0.0.0.0# 允许任意的客户端进行链接访问
具体步骤:
1、进入到Dashboard中,依次点击"集成" ----> "规则" ----> "创建" 进入到创建规则的表单页面,点击动作输出
添加一个sink
备注:
Redis的命令模板:
HSET emqx_messages:${clientid} username ${username} payload ${payload} timestamp ${timestamp}
2、添加完毕以后效果如下图所示
也可以通过flow设计器查看对应的拓扑图
3、使用MQTTX向't/b'主题中发送消息进行测试,观察Redis中的数据状态
案例二
需求:将发往Kafka中的test_mqtt_topic
主题中的消息输出到EMQX的控制台和Redis中
具体步骤:
1、在Kafka中创建test_mqtt_topic
主题
2、进入到Dashboard中,依次点击"集成" ----> "规则" ----> "创建" 进入到创建规则的表单页面,点击数据输入添加一个source
3、在添加source动作的页面,添加连接器"+"号,添加kafka的连接器
4、在创建规则的表单页面,点击动作输出
添加控制台输出和Redis输出的sink
Redis的命令模板如下所示:
HSET kafka_mqtt:${topic} offset ${offset} value ${value}
命令参数可以通过控制台输出进行确定。
5、向Kafka中的`test_mqtt_topic`发送消息
4.5 Webhook
Webhook简介
Webhook 提供了一种将 EMQX 客户端消息和事件集成到外部 HTTP 服务器的方法。
Webhook 是 EMQX 中开箱即用的功能。当客户端向特定主题发布消息,或执行特定操作时就会触发 Webhook,将事件数据和消息数据转发到预设的 HTTP 服务器中。
Webhook演示
具体步骤:
1、定义http的请求接口
@RestController
@RequestMapping(value = "/webHook")
public class WebHookController {
@PostMapping(value = "/notify")
public void notify(@RequestBody Map<Object , Object> body) {
System.out.println(body);
}
}
2、在Dashboard中创建Webhook
3、通过MQTTX向 a/1
主题发布消息,观察http服务控制台输出
5.日志管理
5.1 日志简介
通过 EMQX 的日志功能,您可查看`客户端访问、操作系统或网络异常等问题`,如登录错误,异常访问,性能故障等等,并基于日志信息进行问题排查或系统性能优化。
EMQX 支持两种不同的日志输出方式:
1、控制台输出日志(默认值)
2、文件输出日志
日志级别:
EMQX 日志包含 8 个等级,默认为 warning 级别,由低到高分别为:
debug < info < notice < warning < error < critical < alert < emergency
每一种日志输出的内容如下所示:
EMQX只会输出比配置日志级别高的日志数据。
5.2 日志配置
通过EMQX Dashboard 可以方便的修改日志配置。保存修改后将立即生效,无需重启节点。点击左侧导航栏的 管理-> 日志。选择相应的页签配置控制台输出日志或文件输出日志。
控制台日志配置
配置控制台日志处理进程的选项:
1、启用日志处理进程
2、日志级别
3、日志格式类型
4、时间戳格式
完成配置后,点击 保存更改。
文件输出日志配置
在日志页面,选择文件日志页签:
在文件日志启用后,日志目录下会有如下几种文件:
- emqx.log.N: 以 emqx.log 为前缀的文件为日志文件,包含了 EMQX 的所有日志消息。比如 `emqx.log.1`、`emqx.log.2` ...
- emqx.log.siz 和 emqx.log.idx: 用于记录日志滚动信息的系统文件,请不要手动修改。
原文地址:https://blog.csdn.net/2301_76354366/article/details/143943638
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!