自学内容网 自学内容网

WebSocket


实现一个简单的webSocket,实现异常信息的推送,重试等

1.引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

2.创建实体类

@Data
public class ExceptionMessage {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private String message;
    private LocalDateTime createdAt;
    // 'PENDING', 'SENT', 'FAILED'
    private String status;
    private int retries;
    private LocalDateTime retryAt;
}

3.创建接口

public interface ExceptionMessageMapper extends JpaRepository<ExceptionMessage, Long> {

    // 获取所有状态为PENDING的消息
    List<ExceptionMessage> findByStatus(String status);

    // 获取重试时间早于当前时间的消息
    List<ExceptionMessage> findByStatusAndRetryAtBefore(String status, LocalDateTime now);
}

4.异常消息服务

@Service
public class ExceptionService {

    private static final int MAX_RETRIES = 3;
    private static final long RETRY_INTERVAL_MS = 5000;  // 5秒重试间隔

    private final ExceptionMessageMapper messageMapper;
    private final SimpMessagingTemplate messagingTemplate;

    @Autowired
    public ExceptionService(ExceptionMessageMapper messageMapper, SimpMessagingTemplate messagingTemplate) {
        this.messageMapper = messageMapper;
        this.messagingTemplate = messagingTemplate;
    }

    // 存储异常消息到数据库
    @Transactional
    public void storeExceptionMessage(String exceptionData) {
        ExceptionMessage message = new ExceptionMessage();
        message.setMessage(exceptionData);
        message.setStatus(StatusEnum.PENDING.getStringVal());  // 初始状态为PENDING
        message.setRetries(0);
        message.setCreatedAt(LocalDateTime.now());
        messageMapper.save(message);
    }

    // 向前端发送消息
    public void sendMessage(String exceptionData) {
        messagingTemplate.convertAndSend(TopicEnum.EXCEPTIONS.getStringVal(), exceptionData);
    }

    // 发送单个消息并更新状态
    @Transactional
    public void processMessage(ExceptionMessage message) {
        try {
            // 存储消息到数据库
            storeExceptionMessage(message.getMessage());
            // 推送消息到前端
            sendMessage(message.getMessage());

            // 更新消息状态为SENT
            message.setStatus(StatusEnum.SENT.getStringVal());
            messageMapper.save(message);

        } catch (Exception e) {
            // 如果发送失败,更新为FAILED并设置重试时间
            message.setStatus(StatusEnum.FAILED.getStringVal());
            message.setRetryAt(LocalDateTime.now().plus(Duration.ofMillis(RETRY_INTERVAL_MS)));
            message.setRetries(message.getRetries() + 1);
            messageMapper.save(message);
        }
    }

    // 重试未成功发送的消息
    public void retryFailedMessages() {
        List<ExceptionMessage> failedMessages = messageMapper.findByStatusAndRetryAtBefore(StatusEnum.FAILED.getStringVal(), LocalDateTime.now());

        for (ExceptionMessage message : failedMessages) {
            if (message.getRetries() < MAX_RETRIES) {
                processMessage(message);
            } else {
                // 超过最大重试次数,放弃推送
                message.setStatus(StatusEnum.GIVE_UP.getStringVal());
                messageMapper.save(message);
            }
        }
    }

    // 重新发送所有PENDING状态的消息
    public void resendPendingMessages() {
        List<ExceptionMessage> pendingMessages = messageMapper.findByStatus(StatusEnum.PENDING.getStringVal());

        for (ExceptionMessage message : pendingMessages) {
            processMessage(message);
        }
    }
}

5.WebSocket 控制器

处理WebSocket连接和消息的推送。

@RestController
@RequestMapping(value = "/app")
public class WebSocketController {
    private final org.example.webSocket.service.ExceptionService exceptionService;

    public WebSocketController(org.example.webSocket.service.ExceptionService exceptionService) {
        this.exceptionService = exceptionService;
    }

    /**
     * 由前端通过 STOMP 协议 调用的,这是一个处理连接请求的方法。
     * 每当前端连接到 WebSocket 时,会触发此方法执行,从而执行恢复未发送的消息
     **/
    @MessageMapping("/connect")
    public void handleConnect() {
        exceptionService.resendPendingMessages();
    }

    /**
     * 每5秒定时重试失败的消息
     **/
    @Scheduled(fixedRate = 5000)
    public void retryFailedMessages() {
        exceptionService.retryFailedMessages();
    }
}

6.配置 WebSocket

在Spring Boot中启用WebSocket。

/**
 * webSocket配置类,注册WebSocket端点
 */
@Configuration
@EnableWebSocketMessageBroker
@EnableScheduling
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 启用简单的消息代理,用于处理客户端的订阅(如:/topic/xxx)
        registry.enableSimpleBroker("/topic");
        // 设置应用程序的前缀,用于处理来自客户端的消息(如:/app/xxx)
        registry.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 注册 WebSocket 端点,客户端通过此端点连接 WebSocket 服务
        registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS();
    }
}

7.客户端 (Vue.js)

在Vue前端,你需要在WebSocket连接成功时向后端发送连接请求以获取未发送的消息。

import { Stomp } from '@stomp/stompjs';
import SockJS from 'sockjs-client';

export default {
  data() {
    return {
      exceptionData: [],
      exceptionCount: 0,
    };
  },
  mounted() {
    this.connectToWebSocket();
  },
  methods: {
    connectToWebSocket() {
      const socket = new SockJS('/ws');
      const stompClient = Stomp.over(socket);

      stompClient.connect({}, (frame) => {
        // 订阅异常消息
        stompClient.subscribe('/topic/exceptions', (message) => {
          this.exceptionData.push(message.body);
          this.exceptionCount = this.exceptionData.length;
        });

        // 发送连接请求,获取未发送的消息
        stompClient.send("/app/connect");
      });
    }
  }
};


原文地址:https://blog.csdn.net/qq_45765353/article/details/144764443

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!