自学内容网 自学内容网

mqtt详细介绍及集成到springboot

1.mqtt发布/订阅消息参数详细介绍

  • 1.1. qos
  • QoS=0 ,Sender 发送的一条消息,Receiver 最多能收到一次,也就是说 Sender 尽力向 Receiver
    发送消息,如果发送失败,也就算了;
  • QoS=1 ,Sender 发送的一条消息,Receiver 至少能收到一次,也就是说 Sender
    向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,但是因为重传的原因,Receiver 有可能会收到重复的消息; .
  • QoS=2 ,Sender 发送的一条消息,Receiver 确保能收到而且只收到一次,也就是说 Sender
    尽力向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,同时保
    证 Receiver 不会因为消息重传而收到重复的消息。
  • QoS 在发布与订阅中的区别
    当客户端 A 的发布 QoS 大于 客户端 B 的订阅 QoS 时,服务端向客户端 B 转发消息时使用的 QoS 为客户端 B 的订阅 QoS。
    当客户端 A 的发布 QoS 小于客户端 B 的订阅 QoS 时,服务端向客户端 B 转发消息时使用的 QoS 为客户端 A 的发布 QoS。
    总结:服务端给订阅端发送消息时,发布端的QoS 和 订阅端的QoS 谁小,就用谁的QoS级别。

2. mqtt客户端连接参数介绍

  • cleanSession
    为 true 时表示创建一个新的会话,每次连接时不会持久化订阅信息或消息队列。如果连接断开,服务器会丢失客户端的会话信息(例如订阅的主题)。
    为 false 时表示创建一个持久会话,在客户端断开连接后会话仍然保持,服务器不会丢失客户端的会话信息(例如订阅的主题),直到会话超时注销。

  • keepAliveInterval
    心跳时间间隔,默认60s
    MQTT 协议中约定:在 1.5*Keep Alive 的时间间隔内,如果 Broker 没有收到来自 Client 的任何数据包,那么 Broker 认为它和 Client 之间的连接已经断开;同样地, 如果 Client 没有收到来自 Broker 的任何数据包,那么 Client 认为它和 Broker 之间的连接已经断开。
    emqx中可以通过日志追踪查看心跳日志。在这里插入图片描述在这里插入图片描述

  • clientId

    • 唯一标识客户端:MQTT 服务器根据 clientId 来识别和管理不同的客户端连接。如果两个客户端连接时使用相同的 clientId,服务器可能会断开第一个客户端的连接并接受第二个客户端的连接,导致第一个客户端的连接被踢出。
    • 消息持久化和会话管理:客户端连接时可以选择是否持久化会话。如果会话持久化,服务器会将客户端的订阅信息、消息等数据与 clientId 绑定,并在客户端断开后保留这些信息,直到客户端重新连接时恢复会话。
    • 维护订阅状态:使用 clientId,服务器能够记住客户端订阅的主题和 QoS 等信息,即使客户端断开了连接,只要在重新连接时使用相同的 clientId,服务器就会恢复这些订阅状态。
    • 如果需要会话持久化、订阅信息持久化等功能时,最好使用固定的 clientId,这样可以确保重新连接时,服务器能够恢复会话信息

3. docker-compose搭建mqtt服务端

不会安装docker的可以参考文章:linux安装docker和docker-compose详细教程
docker-compose.yml配置

version: '3.3'
services:
  mqtt:
    image: emqx/emqx:latest
    container_name: mqtt_server  
    ports:
      - "1883:1883"
      - "8083:8083"
      - "18083:18083"
    networks:
      - mqtt
    volumes:
#      - ./conf/emqx.conf:/opt/emqx/etc/emqx.conf 
#      - ./data:/opt/emqx/data
      - ./log:/opt/emqx/log


networks:
  mqtt:

保存后直接docker-compose up -d启动,访问页面:http://192.168.80.251:18083/
默认账号:admin 密码:public

4. springboot集成mqtt实现发布订阅

gitee项目地址:https://gitee.com/wangyunchao6/springboot-mqtt.git

pom依赖

        <!--        mqtt依赖开始-->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>
        <!--        mqtt依赖结束-->

application.yml配置

mqtt:
  broker-url: tcp://192.168.80.251:1883 # 替换为你的 MQTT 服务器ip地址
  client-id: mqtt-server #可以随便写
  username: admin # 如果需要认证
  password: public # 如果需要认证
  default-topic: test/topic

mqtt连接配置
注意:订阅主题的时候也可以不调用callback 方法,直接在subscribe中处理业务逻辑。

            mqttClient.subscribe(topic,2, (t, msg) -> {
                System.out.println("Received message from topic: " + t + ", Message: " + new String(msg.getPayload()));
            });

MqttServerConfig,用这个作为mqtt的服务端订阅test/topic主题

package com.example.springbootmqtt.config;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 接收消息的mqtt服务端配置
 */
@Configuration
@Slf4j
public class MqttServerConfig {

    @Value("${mqtt.broker-url}")
    private String brokerUrl;

    @Value("${mqtt.client-id}")
    private String clientId;

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Bean
    public MqttClient mqttClient() throws MqttException {
        MqttClient client = new MqttClient(brokerUrl, clientId);
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setCleanSession(true);
        //心跳时间间隔,默认60S
//        options.setKeepAliveInterval(30);
        //连接超时时间
//        options.setConnectionTimeout(10);
        //设置自动连接
        options.setAutomaticReconnect(true);
        client.connect(options);

        client.setCallback(new MqttCallback() {
            /**
             * 当客户端与 MQTT Broker 的连接意外断开时触发此方法。
             * 断开的原因会通过参数 cause 传递过来
             */
            @Override
            public void connectionLost(Throwable cause) {
                if (!client.isConnected()) {
                    try {
                        client.reconnect();
                    } catch (MqttException e) {
                        log.error("connectio lost,Throwable={}",cause.getMessage());
                        log.error("connectio lost,MqttException={}",e.getMessage());
                    }
                }
            }

            /**
             * 当客户端收到一条消息时触发此方法。
             */
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                System.out.println("Received message from topic: " + topic);
                String payload = new String(message.getPayload());

                // Handle message based on topic
                switch (topic) {
                    case "topic1":
                        break;
                    case "topic2":
                        break;
                    case "topic3":
                        break;
                    default:
                        System.out.println("Unknown topic: " + topic);
                        break;
                }
            }

            /**
             * 当客户端发送的消息成功到达 Broker(仅对 QoS 1 和 QoS 2 消息有效)时触发此方法。
             * 用于确认消息已经完成传输。
             */
            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                // Not used for subscribers
                try {
                    System.out.println("Delivery complete. Message: " + token.getMessage());
                } catch (Exception e) {
                    e.printStackTrace();
                }            }
        });

        //服务启动时订阅主题
        client.subscribe("test/topic", 2);
        return client;
    }
}

MockMqttClientOneConfig,用这个模拟mqtt客户端向test/topic主题发送数据

package com.example.springbootmqtt.config;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 模拟发送消息的mqtt客户端1配置
 */
@Configuration
@Slf4j
public class MockMqttClientOneConfig {

    @Value("${mqtt.broker-url}")
    private String brokerUrl;

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Bean
    public MqttClient mockMqttClientOne() throws MqttException {
        String clientId = "mock-mqtt-client-one";
        MqttClient client = new MqttClient(brokerUrl, clientId);
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setCleanSession(true);
        //心跳时间间隔,默认60S
//        options.setKeepAliveInterval(30);
        //连接超时时间
//        options.setConnectionTimeout(10);
        //设置自动连接
        options.setAutomaticReconnect(true);
        client.connect(options);

        client.setCallback(new MqttCallback() {
            /**
             * 当客户端与 MQTT Broker 的连接意外断开时触发此方法。
             * 断开的原因会通过参数 cause 传递过来
             */
            @Override
            public void connectionLost(Throwable cause) {
                if (!client.isConnected()) {
                    try {
                        client.reconnect();
                    } catch (MqttException e) {
                        log.error("connectio lost,Throwable={}",cause.getMessage());
                        log.error("connectio lost,MqttException={}",e.getMessage());
                    }
                }
            }

            /**
             * 当客户端收到一条消息时触发此方法。
             */
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                System.out.println("Received message from topic: " + topic);
                String payload = new String(message.getPayload());

                // Handle message based on topic
                switch (topic) {
                    case "topic1":
                        break;
                    case "topic2":
                        break;
                    case "topic3":
                        break;
                    default:
                        System.out.println("Unknown topic: " + topic);
                        break;
                }
            }

            /**
             * 当客户端发送的消息成功到达 Broker(仅对 QoS 1 和 QoS 2 消息有效)时触发此方法。
             * 用于确认消息已经完成传输。
             */
            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                // Not used for subscribers
                try {
                    System.out.println("Delivery complete. Message: " + token.getMessage());
                } catch (Exception e) {
                    e.printStackTrace();
                }            }
        });

        return client;
    }
}

MqttController

package com.example.springbootmqtt.controller;

import com.example.springbootmqtt.service.MqttService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/mqtt")
public class MqttController {

    @Autowired
    private MqttService mqttService;

    // 发布消息
    @GetMapping("/publish")
    public String publish(String topic, String message) {
        mqttService.sendMessage(topic, message);
        return "Message published to topic: " + topic;
    }

    // 订阅主题
    @GetMapping("/subscribe")
    public String subscribe( String topic) {
        mqttService.subscribe(topic);
        return "Subscribed to topic: " + topic;
    }

    // 模拟MockMqttClientOne客户端发布消息
    @GetMapping("/mockClientOnepublish")
    public String mockClientOnepublish( String topic, String message) {
        mqttService.mockClientOnepublish(topic, message);
        return "mockClientOnepublish Message published to topic: " + topic;
    }

    // 模拟MockMqttClientTwo客户端发布消息
    @GetMapping("/mockClientTwoPublish")
    public String mockClientTwoPublish( String topic, String message) {
        mqttService.mockClientTwoPublish(topic, message);
        return "mockClientTwoPublish Message published to topic: " + topic;
    }


}

MqttService

package com.example.springbootmqtt.service;

public interface MqttService {


    void sendMessage(String topic, String message);

    void mockClientOnepublish(String topic, String message);

    void mockClientTwoPublish(String topic, String message);

    void subscribe(String topic);

    void sendDefaultMessage(String message);

    void subscribeDefaultTopic();

}

MqttServiceImpl

package com.example.springbootmqtt.service.impl;


import com.example.springbootmqtt.service.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.UUID;

@Slf4j
@Service
public class MqttServiceImpl implements MqttService {

    @Autowired
    @Qualifier("mqttClient")
    private MqttClient mqttClient;
    @Autowired
    @Qualifier("mockMqttClientOne")
    private MqttClient mockMqttClientOne;
    @Autowired
    @Qualifier("mockMqttClientTwo")
    private MqttClient mockMqttClientTwo;

    @Value("${mqtt.default-topic}")
    private String defaultTopic;


    // 发送消息
    @Override
    public void sendMessage(String topic, String message) {
        try {
            MqttMessage mqttMessage = new MqttMessage(message.getBytes());
            mqttMessage.setQos(1);
            mqttClient.publish(topic, mqttMessage);
            System.out.println("Message sent to topic: " + topic + ", Message: " + message);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void mockClientOnepublish(String topic, String message) {
        try {
            MqttMessage mqttMessage = new MqttMessage(message.getBytes());
            mqttMessage.setQos(1);
            mockMqttClientOne.publish(topic, mqttMessage);
            System.out.println("Message sent to topic: " + topic + ", Message: " + message);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void mockClientTwoPublish(String topic, String message) {
        try {
            String clientId = UUID.randomUUID().toString();
            MqttClient client = new MqttClient("tcp://192.168.80.251:1883", clientId);
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName("admin");
            options.setPassword("public".toCharArray());
            options.setCleanSession(true);
            //心跳时间间隔,默认60S
//            options.setKeepAliveInterval(30);
            //连接超时时间
//        options.setConnectionTimeout(10);
            //设置自动连接
            options.setAutomaticReconnect(true);
            client.connect(options);
            MqttMessage mqttMessage = new MqttMessage(message.getBytes());
            mqttMessage.setQos(1);
            client.publish(topic, mqttMessage);
            System.out.println("Message sent to topic: " + topic + ", Message: " + message);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    // 订阅消息
    @Override
    public void subscribe(String topic) {
        try {
            mqttClient.subscribe(topic, (t, msg) -> {
                System.out.println("Received message from topic: " + t + ", Message: " + new String(msg.getPayload()));
            });
            System.out.println("Subscribed to topic: " + topic);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    // 默认主题发送消息
    @Override
    public void sendDefaultMessage(String message) {
        sendMessage(defaultTopic, message);
    }

    // 默认主题订阅
    @Override
    public void subscribeDefaultTopic() {
        subscribe(defaultTopic);
    }
}

5. 测试

启动程序,调用接口:http://127.0.0.1:8080/mqtt/mockClientTwoPublish?topic=test/topic&message=aaaaaa
查看打印结果
在这里插入图片描述

注意事项

先说结论:

  • messageArrived:由 消息订阅者 使用,在消息被成功接收到时触发。
  • deliveryComplete:由 消息发布者 使用,在消息被成功传输(且确认完成)时触发。

向主题test/topic发送一条消息,看上图的控制台输出结果,消息发布者并没有进入messageArrived方法,消息订阅者并没有进入deliveryComplete方法,所以在编写代码时只需要根据自己的角色,在对应方法写业务逻辑即可。


原文地址:https://blog.csdn.net/weixin_47874230/article/details/137137000

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!