自学内容网 自学内容网

MQTT知识要点

介绍

MQTT (Message Queuing Telemetry Transport) 是一种轻量级的发布/订阅消息协议,专为低带宽环境M2M而设计。是物联网(IoT)最常用的消息传递协议。

  • 轻量高效
  • 双向通信
  • 可以扩展以连接数百万台物联网设备。
  • 可靠的消息传递(支持3种QOS级别)
  • 支持不可靠的网络(支持持久会话)
  • 安全保证(支持TLS)

工作原理

连接

在这里插入图片描述

keepalive

如果在一定时间内(keep alive period)没有发送消息,客户端会发送PINGREQ数据包到服务器,服务器接收后返回PINGRESP,以此确认连接正常。

客户端设置了保活时间,服务器如果在客户端设置的保活时间的1.5倍仍没有收到客户端发送的PINGREQ数据包,会强制断开客户端的连接。

Retained Messages(保留消息)

通常,如果发布者向某个主题发布消息,而没有人订阅该主题,则代理会丢弃该消息。
然而,发布者可以通过设置保留消息标志(Retain Flag)来告诉代理保留该主题的最后一条消息。

Clean Sessions

如果是clean session,选项为true,客户端断开连接时,代理不会记住之前会话状态任何信息。
如果是no clean session,选项为false,代理将根据QoS级别保留客户端的订阅和未送达的消息

Last Will Messages

Last Will Messages(遗嘱消息)用于处理客户端意外断开连接的情况,通知订阅者由于网络中断导致发布者不可用。

当客户端连接到MQTT代理时,可以提前注册一条消息,该消息将在客户端意外断开时发送到指定的主题。这使得其他订阅此主题的客户端可以被通知客户端的异常断开。

Topics

Topic结构

MQTT topic的结构类似于文件系统中的文件夹和文件,使用正斜杠(/)作为分隔符。

  • 区分大小写
  • 使用 UTF-8 字符串。
  • 必须至少包含一个字符才有效。

$SYS 主题,默认系统主题都在这个下面。

topic由订阅或发布客户端创建,并且不是永久的。以下情况会创建topic

  • 有人订阅了某个topic
  • 某人向主题发布一条消息,并将保留消息设置为 True。

订阅多个topic时,可以使用通配符。发布只能向单个topic发布消息,不允许使用通配符

  • #(井号) – 多级通配符
    • (加号) -单级通配符

以下情况会删除topic

  • 当订阅该代理的最后一个客户端断开连接时,clean session为true。
  • 客户端以clean session为 True的方式连接broker时。

Topic使用

  • 由于主题区分大小写,因此仅使用小写。
  • 使用前缀分隔命令和响应主题,例如 command/ 和 response/
  • 不要使用带空格的主题。
  • 仅使用字母、数字和破折号
  • 在主题结构中包含路由信息。

发布与订阅

QOS 服务质量等级

MQTT 支持 3 个QOS级别 0、1、2。

  • QOS -0 – (At most once) 默认,不保证消息一定被传递。
  • QOS -1 – (At lease once) 保证消息传递,但可能会重复。
  • QOS -2 - (Exactly once) 保证消息传递,并且只传递1次。

不同级别消息在客户端与代理直接的消息流:

在这里插入图片描述

安装使用

安装

本地开发学习可以使用 Eclipse免费的公共 MQTT 代理

https://mqtt.eclipseprojects.io/

或者自己安装一个mqtt boker

docker run -p 8080:8080 -p 1883:1883 hivemq/hivemq4

开发(基于 Eclipse paho go client 库)

Eclipse paho支持的客户端库:https://eclipse.dev/paho/index.php?page=downloads.php

package main

import (
"fmt"
//import the Paho Go MQTT library
MQTT "github.com/eclipse/paho.mqtt.golang"
"os"
"time"
)

// define a function for the default message handler
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
}

var msgRcvd = func(client MQTT.Client, msg MQTT.Message) {
fmt.Printf("Received message on TOPIC: %s  Messages: %s \n", msg.Topic(), msg.Payload())
}

func main() {
//create a ClientOptions struct setting the broker address, clientid, turn
//off trace output and set the default message handler
//设置broker url和 client ID
opts := MQTT.NewClientOptions().AddBroker("tcp://mqtt.eclipseprojects.io:1883")
opts.SetClientID("go-simple")
opts.SetDefaultPublishHandler(f)
opts.SetKeepAlive(5 * time.Second)

//create and start a client using the above ClientOptions
c := MQTT.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}

//subscribe to the topic /go-mqtt/sample and request messages to be delivered
//at a maximum qos of zero, wait for the receipt to confirm the subscription

if token := c.Subscribe("go-mqtt/sample", 0, msgRcvd); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}

//Publish 5 messages to /go-mqtt/sample at qos 1 and wait for the receipt
//from the server after sending each message
for i := 0; i < 5; i++ {
text := fmt.Sprintf("this is msg #%d!", i)
token := c.Publish("go-mqtt/sample", 0, false, text)
token.Wait()
}

time.Sleep(300 * time.Second)

//unsubscribe from /go-mqtt/sample
if token := c.Unsubscribe("go-mqtt/sample"); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}

c.Disconnect(250)
}

参考來源:
http://www.steves-internet-guide.com/
https://mqtt.org/


原文地址:https://blog.csdn.net/zhangj1125/article/details/144338446

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!