golang的MQTT的连接操作
最近在使用golang开发后端服务的时候,因为需要和物联网设备进行数据交互,由于需要终端主动上报数据,也需要平台主动下发指令,所以我们选择了mqtt进行通信。
1、MQTT通信逻辑
MQTT采用的是订阅和发布的方式,数据是通过广播的方式,客户端A、B、C订阅了一个主题后,主题客户端发送数据的时候,A、B、C客户端都会收到广播出来的数据。那么当我们物联网设备需要向平台发送消息数据的时候,我们平台就订阅物联网设备发布的主题,然后设备发布消息后,平台收到订阅发布的消息,然后平台再对消息进行逻辑处理;当平台想给物联网设备发送消息的时候,物联网设备需要订阅平台发布的主题,然后平台发送广播消息,物联网设备就收到平台发送的消息。
还有就是MQTT通信需要提前建立一个MQTT服务器,这个服务是MQTT消息的转发服务,他负责主题的管理,客户端端的管理等,我们项目使用的开源MQTT服务EMQX搭建的。
2、基于golang的代码实现
基于golang实现的MQTT服务连接部分代码如下所示:
import (
"fmt"
"crypto/tls"
"crypto/x509"
"io/ioutil"
mqtt "github.com/eclipse/paho.mqtt.golang"
}
type jellyMessgae struct {
Message int `json:"message"`
Data interface{} `json:"data"`
}
type jellyCommand struct {
Command int `json:"command"`
Data interface{} `json:"data"`
}
var g_client *mqtt.Client
func MQTTInit() {
var broker = "192.168.10.138"
var port = 8883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("mqtts://%s:%d", broker, port))
newclientid := "Mqttclientid-" + time.Now().Format("2006-01-02-15:04:05")
opts.SetClientID(newclientid)
opts.SetUsername("admin")
opts.SetPassword("admin@123")
opts.SetDefaultPublishHandler(messagePubHandler)
opts.SetAutoReconnect(true)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
tlsConfig := NewTlsConfig()
opts.SetTLSConfig(tlsConfig)
client := mqtt.NewClient(opts)
g_client = &client
if token := (*g_client).Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
}
func NewTlsConfig() *tls.Config {
certpool := x509.NewCertPool()
ca, err := ioutil.ReadFile("./config/ca.crt")
if err != nil {
fmt.Println(err.Error())
}
certpool.AppendCertsFromPEM(ca)
return &tls.Config{
RootCAs: certpool,
InsecureSkipVerify: true}
}
// MQTT客户端连接Handle
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected MQTT Server!!!")
//连接成功后,订阅主题,订阅后,EMQX才新建一个对应的主题
token := client.Subscribe("message_topic", 1, nil)
token.Wait()
fmt.Println("Subscribed to topic: ", "message_topic")
}
// MQTT最后一次链接Handle
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %v", err)
//这里可以做自定义重连机制
}
// MQTT客户端订阅消息处理回调Handle
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
//处理agent传上来的消息
var pmessage jellyMessgae
json.Unmarshal(msg.Payload(), &pmessage)
fmt.Println("Received message from topic: ", msg.Topic(), "message = ", pmessage.Message)
//处理你接收到的数据
// add your code
//给终端发送MQTT消息
var mesg jellyCommand
//组装需要发送的数据
//add your code
resByte, err := json.Marshal(mesg)
if err != nil {
fmt.Println("[MQTTCommand_AgentUpdate] json.Marshal error:", err)
return
}
MQTTCommand_Publish("command_topic", resByte)
}
//平台发送命令到物联网设备的MQTT消息
func MQTTCommand_Publish(topic string, data interface{}) {
fmt.Println("[MQTTCommand_Publish]: ", topic)
token := (*g_client).Publish(topic, 0, false, data)
token.Wait()
}
简单解析下上面的代码:
- 代码不能直接运行,因为没有定义package包,需要将内容拷贝到你自己的测试代码里面去
- 代码里面使用的是mqtts来连接的mqtt服务器,服务器使用mqtts才能进行连接,客户端需要配置一个ca证书,证书可以是自签证书也可以是CA机构颁发的证书,示例代码里面的证书使用的是ca.crt自签证书
- 不管是发布消息还是接收消息,都需要使用一个唯一的客户端id去连接MQTT服务,然后再进行订阅、发布、接收等操作,示例代码中使用的是一个字符串加时间字符串来作为客户端ID
- 代码里面订阅了一个叫message_topic的主题,平台订阅了这个主题后,物联网设备上报消息的时候,平台就可以收到消息,解析消息的代码也给出,具体怎么处理数据,大家自己修改代码
- 平台通过同一个客户端ID去发送数据,主题为command_topic,发送的数据定义了一个结构体数据,用户自行组合数据进行数据的发送,物联网设备订阅了该主题的就会收到此数据
- 为了简便展示,示例代码里面的逻辑是当平台收到主题为message_topic的消息后,马上就往主题为command_topic发送了数据。
3、小结
- EMQX服务的主题不需要去创建和取消,当客户端发送消息,附带的主题时,服务会自动创建一个主题
- 连接MQTT服务的客户端ID不能重复,否则会报错,无法连接到服务端
- MQTT消息发送是广播形式,用户可以自己定义一些参数,标识数据是发给所有物联网终端还是发给单独的个别物联网终端
- 代码示例中的broker是MQTT的服务端地址,port是对应的端口号,8883表示的mqtts的连接方式。
原文地址:https://blog.csdn.net/u013896064/article/details/137498262
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!