WebSocket
项目需要,要用到WebSocketConfig,网上找资料研究了下
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
最重要的注册到spring容器,spring boot自动会配置
package com.shinedata.wordtools.config.websocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
//开启WebSocket的支持,并把该类注入到spring容器中
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
端点实体类
package com.shinedata.wordtools.config.websocket;
import lombok.Data;
import javax.websocket.Session;
import java.time.LocalDateTime;
@Data
public class ClientInfoEntity {
/**
* 客户端唯一标识
*/
private String token;
/**
* 客户端连接的session
*/
private Session session;
/**
* 连接存活时间
*/
private LocalDateTime existTime;
}
消息处理和自定义配置类
package com.shinedata.wordtools.config.websocket;
import cn.hutool.core.util.ObjectUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.CrossOrigin;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
@CrossOrigin(origins = "*")
@ServerEndpoint(value = "/webSocket/{token}", configurator = GetHttpSessionConfig.class)//参数configurator 一个类,用于提供配置 WebSocket 端点的自定义配置器,这允许在 WebSocket 端点创建和配置过程中进行额外的设置
public class ChatEndpoint2 {
//key:客户端连接唯一标识(token)
//value:ClientInfoEntity
private static final Map<String, ClientInfoEntity> uavWebSocketInfoMap = new ConcurrentHashMap<String, ClientInfoEntity>();
private static final int EXIST_TIME_HOUR = 6;
/**
* 连接建立成功调用的方法
*
* @param session 第一个参数必须是session
* @param sec
* @param token 代表客户端的唯一标识
*/
@OnOpen
public void onOpen(Session session, EndpointConfig sec, @PathParam("token") String token) {
if (uavWebSocketInfoMap.containsKey(token)) {
throw new RuntimeException("token已建立连接");
}
//把成功建立连接的会话在实体类中保存
ClientInfoEntity entity = new ClientInfoEntity();
entity.setToken(token);
entity.setSession(session);
//默认连接6个小时
entity.setExistTime(LocalDateTime.now().plusHours(EXIST_TIME_HOUR));
uavWebSocketInfoMap.put(token, entity);
//之所以获取http session 是为了获取获取httpsession中的数据 (用户名 /账号/信息)
log.info("WebSocket 连接建立成功: " + token);
}
/**
* 当断开连接时调用该方法
*
* @param session
*/
@OnClose
public void onClose(Session session, @PathParam("token") String token) {
// 找到关闭会话对应的用户 ID 并从 uavWebSocketInfoMap 中移除
if (ObjectUtil.isNotEmpty(token) && uavWebSocketInfoMap.containsKey(token)) {
uavWebSocketInfoMap.remove(token);
log.info("WebSocket 连接关闭成功: " + token);
}
}
/**
* 接受消息
* 这是接收和处理来自用户的消息的地方。我们需要在这里处理消息逻辑,可能包括广播消息给所有连接的用户。
*
*/
@OnMessage
public void onMessage(Session session, @PathParam("token") String token, String message) throws IOException {
log.info("接收到消息:" + message);
ClientInfoEntity entity = uavWebSocketInfoMap.get(token);
//如果是心跳包
if("heartbeat".equals(message)){
//只要接受到客户端的消息就进行续命(时间)
entity.setExistTime(LocalDateTime.now().plusHours(EXIST_TIME_HOUR));
if (entity.getSession().isOpen()) {
entity.getSession().getBasicRemote().sendText("{\"msg\": \"success\", \"code\": 0}");
}
return;
}
//业务逻辑
//只要接受到客户端的消息就进行续命(时间)
entity.setExistTime(LocalDateTime.now().plusHours(EXIST_TIME_HOUR));
if (entity.getSession().isOpen()) {
entity.getSession().getBasicRemote().sendText("{\"msg\": \"success\", \"code\": 0}");
}
}
/**
* 处理WebSocket中发生的任何异常。可以记录这些错误或尝试恢复。
*/
@OnError
public void onError(Throwable error) {
log.error("报错信息:" + error.getMessage());
error.printStackTrace();
}
private static final SimpleDateFormat FORMAT = new SimpleDateFormat("yyyy:MM:dd hh:mm:ss");
/**
* 发生消息定时器
*/
@Scheduled(cron = "0/1 * * * * ? ")
public void refreshDate() {
//开启定时任务,1秒一次向前台发送当前时间
//当没有客户端连接时阻塞等待
if (!uavWebSocketInfoMap.isEmpty()) {
//超过存活时间进行删除
Iterator<Map.Entry<String, ClientInfoEntity>> iterator = uavWebSocketInfoMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, ClientInfoEntity> entry = iterator.next();
if (entry.getValue().getExistTime().compareTo(LocalDateTime.now()) <= 0) {
log.info("WebSocket " + entry.getKey() + " 已到存活时间,自动断开连接");
try {
entry.getValue().getSession().close();
} catch (IOException e) {
log.error("WebSocket 连接关闭失败: " + entry.getKey() + " - " + e.getMessage());
}
//过期则进行移除
iterator.remove();
}
}
sendMessage(FORMAT.format(new Date()));
}
}
/**
* 群发信息的方法
*
* @param message 消息
*/
public void sendMessage(String message) {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
+ "发送全体消息:" + message);
//循环客户端map发送消息
uavWebSocketInfoMap.values().forEach(item -> {
//向每个用户发送文本信息。这里getAsyncRemote()解释一下,向用户发送文本信息有两种方式,
// 一种是getBasicRemote,一种是getAsyncRemote
//区别:getAsyncRemote是异步的,不会阻塞,而getBasicRemote是同步的,会阻塞,由于同步特性,第二行的消息必须等待第一行的发送完成才能进行。
// 而第一行的剩余部分消息要等第二行发送完才能继续发送,所以在第二行会抛出IllegalStateException异常。所以如果要使用getBasicRemote()同步发送消息
// 则避免尽量一次发送全部消息,使用部分消息来发送,可以看到下面sendMessageToTarget方法内就用的getBasicRemote,因为这个方法是根据用户id来私发的,所以不是全部一起发送。
item.getSession().getAsyncRemote().sendText(message,new SendHandler() {
@Override
public void onResult(SendResult result) {
if (result.isOK()) {
// 消息发送成功的处理逻辑
} else {
// 消息发送失败的处理逻辑
}
}
});
});
}
}
package com.shinedata.wordtools.config.websocket;
import org.springframework.context.annotation.Configuration;
import javax.servlet.http.HttpSession;
import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;
import java.util.Map;
import java.util.UUID;
@Configuration
public class GetHttpSessionConfig extends ServerEndpointConfig.Configurator{
/**
* 注意: 每一个客户端发起握手,端点就有一个新的实列,那么引用的这个配置也是新的实列,这里sec的用户属性也不同就不会产生冲突。
* 修改握手机制 就是第一次http发送过来的握手
* @param sec 服务器websocket端点的配置
* @param request
* @param response
*/
@Override
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
// 将从握手的请求中获取httpsession
HttpSession httpSession =(HttpSession) request.getHttpSession();
/**
* 一般会在请求头中添加token 解析出来id作为键值对
*/
Map<String, Object> properties = sec.getUserProperties();
/**
* 一个客户端和和服务器发起一次请求交互 就有一个唯一session
* 设置唯一标识:为每个客户端生成一个唯一的UUID作为连接标识,并将其存储在UserProperties中,便于后续跟踪与管理
*/
// properties.put(HttpSession.class.getName(),httpSession);
String sessionKey = UUID.randomUUID().toString().replaceAll("-", "");
properties.put("Connected",sessionKey);
}
}
WebSocket在线测试工具 找个网站测试下 websocket通信地址以ws://开头,我的springboot启动在8070端口,所以我的地址是 ws://127.0.0.1:8070/api/websocket/{连接token或自己定义一个id等}
原文地址:https://blog.csdn.net/ypp91zr/article/details/143565055
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!