自学内容网 自学内容网

Websocket在Java中的实践——整合Rabbitmq和STOMP

《Websocket在Java中的实践——STOMP通信的最小Demo》一文中,我们使用enableSimpleBroker启用一个内置的内存级消息代理。本文我们将使用Rabbitmq作为消息代理,这样我们的服务就可以变成分布式部署。

Rabbitmq

开启STOMP支持

在Rabbitmq所在的机器上执行下面的命令:

sudo -H -u rabbitmq bash -c "/usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_stomp"

在这里插入图片描述
然后启动Rabbitmq

sudo service rabbitmq-server start

服务端

依赖

spring-boot-starter-websocket用于Websocket服务。
spring-boot-starter-amqp和spring-rabbit-stream都是用于Rabbitmq操作。
reactor-netty用于Broker。

<dependency>
   <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit-stream</artifactId>
</dependency>
<dependency>
    <groupId>io.projectreactor.netty</groupId>
    <artifactId>reactor-netty</artifactId>
    <version>1.1.20</version>
</dependency>

参数

src/main/resources/application.properties
需要注意的是,rabbitmq_stomp启动后会开启61613端口。

spring.rabbitmq.host=172.30.254.255
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=fangliang
spring.rabbitmq.stomp.port=61613

在这里插入图片描述
还有一点需要注意,很多文章上说使用guest用户登录。但是guest用户只能在Rabbitmq所在的机器上使用,如果跨机器使用会报下列错误。而且这和是否设置guest为全域无关。所以我们使用admin账户。

Received ERROR {message=[Bad CONNECT], content-type=[text/plain], version=[1.0,1.1,1.2], content-length=[26]} session=system text/plain payload=non-loopback access denied

spring.rabbitmq.stomp.port是一个自定义参数,它只是供Broker连接Rabbitmq使用。
spring.rabbitmq.port在当前本文例子中没有使用。

参数映射类

这个类主要是映射上述参数,方便后续使用。
src/main/java/com/nyctlc/stomprbmq/component/RabbitMQProperties.java

package com.nyctlc.stomprbmq.component;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQProperties {
    @Value("${spring.rabbitmq.password}")
    private String rabbitmqPassword;

    public String getRabbitmqPassword() {
        return rabbitmqPassword;
    }

    @Value("${spring.rabbitmq.username}")
    private String rabbitmqUsername;

    public String getRabbitmqUsername() {
        return rabbitmqUsername;
    }

    @Value("${spring.rabbitmq.host}")
    private String rabbitmqHost;

    public String getRabbitmqHost() {
        return rabbitmqHost;
    }

    @Value("${spring.rabbitmq.port}")
    private String rabbitmqPort;

    public String getRabbitmqPort() {
        return rabbitmqPort;
    }

    @Value("${spring.rabbitmq.stomp.port}")
    private String rabbitmqStompPort;

    public String getRabbitmqStompPort() {
        return rabbitmqStompPort;
    }
}

配置类

/handshake是STOMP和Websocket建立握手的接口。
enableStompBrokerRelay(“/topic”)会订阅Rabbitmq默认的交换器amq.topic的绑定关系中定义的队列。(所以我们看到很多文章订阅的前缀使用的是“topic”,而不用其他字段,这是有渊源的)
在这里插入图片描述
在这里插入图片描述

setRelayPort方法传递的是Rabbitmq的STOMP端口,即61613。
setClientLogin、setClientPasscode、setSystemLogin和setSystemPasscode都要设置为admin及其密码,否则会报错。

src/main/java/com/nyctlc/stomprbmq/config/WebSocketConfig.java

package com.nyctlc.stomprbmq.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

import com.nyctlc.stomprbmq.component.RabbitMQProperties;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    
    @Autowired
    private RabbitMQProperties rabbitMQProperties;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/handshake");
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/send");
        registry.enableStompBrokerRelay("/topic")
                .setRelayHost(rabbitMQProperties.getRabbitmqHost())
                .setRelayPort(Integer.parseInt(rabbitMQProperties.getRabbitmqStompPort()))
                .setClientLogin(rabbitMQProperties.getRabbitmqUsername())
                .setClientPasscode(rabbitMQProperties.getRabbitmqPassword())
                .setSystemLogin(rabbitMQProperties.getRabbitmqUsername())
                .setSystemPasscode(rabbitMQProperties.getRabbitmqPassword());
    }
}

逻辑处理类

这个类的handle方法会接受/send/msg-from-user端点发来的消息,然后转发给Rabbitmq的amp.topic交换器下msg-to-user路由键对应的队列。上述代码创建的Broker会持续监听这个队列,如果收到消息,则发送给客户端。

src/main/java/com/nyctlc/stomprbmq/controller/WebSocketController.java

package com.nyctlc.stomprbmq.controller;

import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;

@Controller
public class WebSocketController {
    @MessageMapping("/msg-from-user")
    @SendTo("/topic/msg-to-user")
    public String handle(String msg) {
        System.out.println("Received message: " + msg);
        return msg;
    }
}

测试

测试页面

src/main/resources/static/index.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>STOMP over WebSocket Example with StompJs.Client</title>
    <script src="https://cdn.jsdelivr.net/npm/@stomp/stompjs"></script>
</head>
<body>
    <h2>STOMP over WebSocket Example with StompJs.Client</h2>
    <button id="connectButton">Connect</button>
    <form id="messageForm">
        <input type="text" id="messageInput" placeholder="Type a message..."/>
        <button type="submit">Send</button>
    </form>
    <div id="messages"></div>

    <script>
        var client = null;

        function connect() {
            client = new StompJs.Client({
                brokerURL: 'ws://localhost:8080/handshake', // WebSocket服务端点
                connectHeaders: {},
                debug: function (str) {
                    console.log(str);
                },
                reconnectDelay: 5000,
                heartbeatIncoming: 4000,
                heartbeatOutgoing: 4000,
            });

            client.onConnect = function(frame) {
                console.log('Connected: ' + frame);
                client.subscribe('/topic/msg-to-user', function(message) { // 订阅端点
                    showMessageOutput(JSON.parse(message.body).content);
                });
            };

            client.onStompError = function(frame) {
                console.error('Broker reported error: ' + frame.headers['message']);
                console.error('Additional details: ' + frame.body);
            };

            client.activate();
        }

        function sendMessage(event) {
            event.preventDefault(); // 阻止表单默认提交行为
            var messageContent = document.getElementById('messageInput').value.trim();
            if(messageContent && client && client.connected) {
                var chatMessage = { content: messageContent };
                client.publish({destination: "/send/msg-from-user", body: JSON.stringify(chatMessage)}); // 发送端点
                document.getElementById('messageInput').value = '';
            }
        }

        function showMessageOutput(message) {
            var messagesDiv = document.getElementById('messages');
            var messageElement = document.createElement('div');
            messageElement.appendChild(document.createTextNode(message));
            messagesDiv.appendChild(messageElement);
        }

        document.getElementById('messageForm').addEventListener('submit', sendMessage);

        document.getElementById('connectButton').addEventListener('click', connect);
    </script>
</body>
</html>

Controller

这个Controller主要是为了让上述HTML可以通过URL访问。
src/main/java/com/nyctlc/stomprbmq/controller/FileController.java

package com.nyctlc.stomprbmq.controller;

import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseStatus;

@Controller
public class FileController {

    @GetMapping("/")
    public String index() {
        return "index"; // 返回index.html
    }

    @RequestMapping(value = "/favicon.ico")
    @ResponseStatus(value = HttpStatus.NO_CONTENT)
    public void favicon() {
        // No operation. Just to avoid 404 error for favicon.ico
    }
}

测试案例

在这里插入图片描述
在这里插入图片描述
我们在管理后台直接给这个队列发送消息,前端页面也会收到。比如我们发送{“content”:“message from management”}
在这里插入图片描述
在这里插入图片描述


原文地址:https://blog.csdn.net/breaksoftware/article/details/139907743

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!