自学内容网 自学内容网

WebSocket

项目需要,要用到WebSocketConfig,网上找资料研究了下

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

最重要的注册到spring容器,spring boot自动会配置

package com.shinedata.wordtools.config.websocket;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;


//开启WebSocket的支持,并把该类注入到spring容器中
@Configuration
public class WebSocketConfig {
    
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}

端点实体类

package com.shinedata.wordtools.config.websocket;

import lombok.Data;

import javax.websocket.Session;
import java.time.LocalDateTime;

@Data
public class ClientInfoEntity {

    /**
     * 客户端唯一标识
     */
    private String token;
    /**
     * 客户端连接的session
     */
    private Session session;
    /**
     * 连接存活时间
     */
    private LocalDateTime existTime;
}

消息处理和自定义配置类

package com.shinedata.wordtools.config.websocket;

import cn.hutool.core.util.ObjectUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.CrossOrigin;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@Component
@CrossOrigin(origins = "*")
@ServerEndpoint(value = "/webSocket/{token}", configurator = GetHttpSessionConfig.class)//参数configurator 一个类,用于提供配置 WebSocket 端点的自定义配置器,这允许在 WebSocket 端点创建和配置过程中进行额外的设置
public class ChatEndpoint2 {

    //key:客户端连接唯一标识(token)
    //value:ClientInfoEntity
    private static final Map<String, ClientInfoEntity> uavWebSocketInfoMap = new ConcurrentHashMap<String, ClientInfoEntity>();

    private static final int EXIST_TIME_HOUR = 6;

    /**
     * 连接建立成功调用的方法
     *
     * @param session 第一个参数必须是session
     * @param sec
     * @param token   代表客户端的唯一标识
     */
    @OnOpen
    public void onOpen(Session session, EndpointConfig sec, @PathParam("token") String token) {
        if (uavWebSocketInfoMap.containsKey(token)) {
            throw new RuntimeException("token已建立连接");
        }

        //把成功建立连接的会话在实体类中保存
        ClientInfoEntity entity = new ClientInfoEntity();
        entity.setToken(token);
        entity.setSession(session);
        //默认连接6个小时
        entity.setExistTime(LocalDateTime.now().plusHours(EXIST_TIME_HOUR));
        uavWebSocketInfoMap.put(token, entity);
        //之所以获取http session 是为了获取获取httpsession中的数据 (用户名 /账号/信息)
        log.info("WebSocket 连接建立成功: " + token);
    }

    /**
     * 当断开连接时调用该方法
     *
     * @param session
     */
    @OnClose
    public void onClose(Session session, @PathParam("token") String token) {
        // 找到关闭会话对应的用户 ID 并从 uavWebSocketInfoMap 中移除
        if (ObjectUtil.isNotEmpty(token) && uavWebSocketInfoMap.containsKey(token)) {
            uavWebSocketInfoMap.remove(token);
            log.info("WebSocket 连接关闭成功: " + token);
        }
    }

    /**
     * 接受消息
     * 这是接收和处理来自用户的消息的地方。我们需要在这里处理消息逻辑,可能包括广播消息给所有连接的用户。
     *
     */
    @OnMessage
    public void onMessage(Session session, @PathParam("token") String token, String message) throws IOException {
        log.info("接收到消息:" + message);

        ClientInfoEntity entity = uavWebSocketInfoMap.get(token);
        //如果是心跳包
        if("heartbeat".equals(message)){
            //只要接受到客户端的消息就进行续命(时间)
            entity.setExistTime(LocalDateTime.now().plusHours(EXIST_TIME_HOUR));
            if (entity.getSession().isOpen()) {
                entity.getSession().getBasicRemote().sendText("{\"msg\": \"success\", \"code\": 0}");
            }
            return;
        }
        //业务逻辑

        //只要接受到客户端的消息就进行续命(时间)
        entity.setExistTime(LocalDateTime.now().plusHours(EXIST_TIME_HOUR));
        if (entity.getSession().isOpen()) {
            entity.getSession().getBasicRemote().sendText("{\"msg\": \"success\", \"code\": 0}");
        }
    }

    /**
     * 处理WebSocket中发生的任何异常。可以记录这些错误或尝试恢复。
     */
    @OnError
    public void onError(Throwable error) {
        log.error("报错信息:" + error.getMessage());
        error.printStackTrace();

    }

    private static final SimpleDateFormat FORMAT = new SimpleDateFormat("yyyy:MM:dd hh:mm:ss");

    /**
     * 发生消息定时器
     */
    @Scheduled(cron = "0/1 * *  * * ? ")
    public void refreshDate() {
        //开启定时任务,1秒一次向前台发送当前时间
        //当没有客户端连接时阻塞等待
        if (!uavWebSocketInfoMap.isEmpty()) {
            //超过存活时间进行删除
            Iterator<Map.Entry<String, ClientInfoEntity>> iterator = uavWebSocketInfoMap.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, ClientInfoEntity> entry = iterator.next();
                if (entry.getValue().getExistTime().compareTo(LocalDateTime.now()) <= 0) {
                    log.info("WebSocket " + entry.getKey() + " 已到存活时间,自动断开连接");
                    try {
                        entry.getValue().getSession().close();
                    } catch (IOException e) {
                        log.error("WebSocket 连接关闭失败: " + entry.getKey() + " - " + e.getMessage());
                    }
                    //过期则进行移除
                    iterator.remove();
                }
            }
            sendMessage(FORMAT.format(new Date()));
        }
    }

    /**
     * 群发信息的方法
     *
     * @param message 消息
     */
    public void sendMessage(String message) {
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
                + "发送全体消息:" + message);
        //循环客户端map发送消息
        uavWebSocketInfoMap.values().forEach(item -> {
            //向每个用户发送文本信息。这里getAsyncRemote()解释一下,向用户发送文本信息有两种方式,
            // 一种是getBasicRemote,一种是getAsyncRemote
            //区别:getAsyncRemote是异步的,不会阻塞,而getBasicRemote是同步的,会阻塞,由于同步特性,第二行的消息必须等待第一行的发送完成才能进行。
            // 而第一行的剩余部分消息要等第二行发送完才能继续发送,所以在第二行会抛出IllegalStateException异常。所以如果要使用getBasicRemote()同步发送消息
            // 则避免尽量一次发送全部消息,使用部分消息来发送,可以看到下面sendMessageToTarget方法内就用的getBasicRemote,因为这个方法是根据用户id来私发的,所以不是全部一起发送。
            item.getSession().getAsyncRemote().sendText(message,new SendHandler() {
                @Override
                public void onResult(SendResult result) {
                    if (result.isOK()) {
                        // 消息发送成功的处理逻辑
                    } else {
                        // 消息发送失败的处理逻辑
                    }
                }
            });
        });
    }
}
package com.shinedata.wordtools.config.websocket;

import org.springframework.context.annotation.Configuration;

import javax.servlet.http.HttpSession;
import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;
import java.util.Map;
import java.util.UUID;

@Configuration
public class GetHttpSessionConfig extends ServerEndpointConfig.Configurator{

    /**
     * 注意:  每一个客户端发起握手,端点就有一个新的实列,那么引用的这个配置也是新的实列,这里sec的用户属性也不同就不会产生冲突。
     * 修改握手机制  就是第一次http发送过来的握手
     * @param sec   服务器websocket端点的配置
     * @param request
     * @param response
     */
    @Override
    public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
//        将从握手的请求中获取httpsession
        HttpSession httpSession =(HttpSession) request.getHttpSession();


        /**
         * 一般会在请求头中添加token 解析出来id作为键值对
         */
        Map<String, Object> properties = sec.getUserProperties();
        /**
         * 一个客户端和和服务器发起一次请求交互 就有一个唯一session
         * 设置唯一标识:为每个客户端生成一个唯一的UUID作为连接标识,并将其存储在UserProperties中,便于后续跟踪与管理
         */
//        properties.put(HttpSession.class.getName(),httpSession);
        String sessionKey = UUID.randomUUID().toString().replaceAll("-", "");
        properties.put("Connected",sessionKey);
    }
}

WebSocket在线测试工具 找个网站测试下 websocket通信地址以ws://开头,我的springboot启动在8070端口,所以我的地址是 ws://127.0.0.1:8070/api/websocket/{连接token或自己定义一个id等}


原文地址:https://blog.csdn.net/ypp91zr/article/details/143565055

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!