自学内容网 自学内容网

SSE(Server-Send-Event)服务端推送数据技术

SSE(Server-Send-Event)服务端推送数据技术

大家是否遇到过服务端需要主动传输数据到客户端的情况,目前有三种解决方案。

  1. 客户端轮询更新数据。
  2. 服务端与客户端建立 Socket 连接双向通信
  3. 服务端与客户建立 SSE 连接单向通信

几种方案的比较:

  1. 轮询:

    客户端通过频繁请求向服务端请求数据,达到类似实时更新的效果。轮询的优点是实现简单,但是会给服务端和网络带来额外的压力,且延迟较高。

  2. WebSocket连接:

    服务端与客户端建立Socket连接进行数据传输,Socket的传输方式是全双工的。WebSocket是基于 TCP 的长连接,和HTTP 协议相比,它能实现轻量级的、低延迟的数据传输,非常适合实时通信场景,主要用于交互性强的双向通信。

  3. SSE推送:

    SSE(Server-Sent Events)是一种基于 HTTP 协议的推送技术,只允许单向通讯。相较于 WebSocket,SSE 更简单、更轻量级。

下面是SpringBoot使用SSE的步骤和示例代码

  1. 配置依赖

        <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
           <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-validation</artifactId>
            </dependency>
    

    SSE已经集成到spring-web中,所以可以直接使用。

  2. 后端代码

    import com.wry.wry_test.service.SseService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.validation.annotation.Validated;
    import org.springframework.web.bind.annotation.*;
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    
    import javax.validation.constraints.NotBlank;
    import java.util.concurrent.CompletableFuture;
    
    @RestController
    @RequestMapping("/sse")
    @Slf4j
    @Validated
    public class SseTestController {
    
        @Autowired
        private SseService service;
    
    
        @GetMapping("/testSse")
        public SseEmitter testSse(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
            final SseEmitter emitter = service.getConn(clientId);
            CompletableFuture.runAsync(() -> {
                try {
                    service.send(clientId);
                    log.info("建立连接成功!clientId = {}", clientId);
                } catch (Exception e) {
                    log.error("推送数据异常");
                }
            });
            return emitter;
        }
    
    
        @GetMapping("/sseConection")
        public SseEmitter createConnection(@RequestParam("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
            return service.getConn(clientId);
        }
    
        @GetMapping("/sendMsg")
        public void sendMsg(@RequestParam("clientId") String clientId) {
            try {
                // 异步发送消息
                CompletableFuture.runAsync(() -> {
                    try {
                        service.send(clientId);
                    } catch (Exception e) {
                        log.error("推送数据异常");
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @GetMapping("/sendMsgToAll")
        public void sendMsgToAll() {
            try {
                //异步发送消息
                CompletableFuture.runAsync(() -> {
                    try {
                        service.sendToAll();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    
        @GetMapping("closeConn/{clientId}")
        public String closeConn(@PathVariable("clientId") @NotBlank(message = "客户端id不能为空") String clientId) {
            service.closeConn(clientId);
            return "连接已关闭";
        }
    
    
    }
    
    
    package com.wry.wry_test.service;
    
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    
    import javax.validation.constraints.NotBlank;
    
    public interface SseService {
    
    
        /**
         * 获取连接
         * @param clientId 客户端id
         * @return
         */
        SseEmitter getConn(String clientId);
    
        /**
         *  发送消息到指定客户端
         * @param clientId 客户端id
         * @throws Exception
         */
        void send(String clientId);
    
        /**
         * 发送消息到所有SSE客户端
         * @throws Exception
         */
        void sendToAll() throws Exception;
    
        /**
         * 关闭指定客户端的连接
         * @param clientId 客户端id
         */
        void closeConn(String clientId);
    }
    
    
    package com.wry.wry_test.service.impl;
    
    import com.wry.wry_test.service.SseService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Service;
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    
    import javax.validation.constraints.NotBlank;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    
    @Service
    @Slf4j
    public class SseServiceImpl implements SseService {
    
        private static final Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();
    
        @Override
        public SseEmitter getConn(@NotBlank String clientId) {
            final SseEmitter sseEmitter = SSE_CACHE.get(clientId);
    
            if (sseEmitter != null) {
                return sseEmitter;
            } else {
                // 设置连接超时时间,需要配合配置项 spring.mvc.async.request-timeout: 600000 一起使用
                final SseEmitter emitter = new SseEmitter(600_000L);
                // 注册超时回调,超时后触发
                emitter.onTimeout(() -> {
                    log.info("连接已超时,正准备关闭,clientId = {}", clientId);
                    SSE_CACHE.remove(clientId);
                });
                // 注册完成回调,调用 emitter.complete() 触发
                emitter.onCompletion(() -> {
                    log.info("连接已关闭,正准备释放,clientId = {}", clientId);
                    SSE_CACHE.remove(clientId);
                    log.info("连接已释放,clientId = {}", clientId);
                });
                // 注册异常回调,调用 emitter.completeWithError() 触发
                emitter.onError(throwable -> {
                    log.error("连接已异常,正准备关闭,clientId = {}", clientId, throwable);
                    SSE_CACHE.remove(clientId);
                });
    
                SSE_CACHE.put(clientId, emitter);
                log.info("建立连接成功!clientId = {}", clientId);
                return emitter;
            }
        }
    
        /**
         * 模拟类似于 chatGPT 的流式推送回答
         *
         * @param clientId 客户端 id
         * @throws IOException 异常
         */
        @Override
        public void send(@NotBlank String clientId) {
            final SseEmitter emitter = SSE_CACHE.get(clientId);
            if (emitter == null) return;
    
            // 开始推送数据
            // todo 模拟推送数据
            for (int i = 0; i < 10000000; i++) {
                String msg = "SSE 测试数据";
                try {
                    this.sseSend(emitter, msg, clientId);
                    Thread.sleep(1000);
                } catch (Exception e) {
                    log.error("推送数据异常", e);
                    break;
                }
            }
    
            log.info("推送数据结束,clientId = {}", clientId);
            // 结束推流
            emitter.complete();
        }
    
        /**
         * 发送数据给所有连接
         */
        public void sendToAll() {
            List<SseEmitter> emitters = new ArrayList<>(SSE_CACHE.values());
            for (int i = 0; i < 10000000; i++) {
                String msg = "SSE 测试数据";
                this.sseSend(emitters, msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        @Override
        public void closeConn(@NotBlank String clientId) {
            final SseEmitter sseEmitter = SSE_CACHE.get(clientId);
            if (sseEmitter != null) {
                sseEmitter.complete();
            }
        }
    
        /**
         * 推送数据封装
         *
         * @param emitter  sse长连接
         * @param data     发送数据
         * @param clientId 客户端id
         */
        private void sseSend(SseEmitter emitter, Object data, String clientId) {
            try {
                emitter.send(data);
                log.info("推送数据成功,clientId = {}", clientId);
            } catch (Exception e) {
                log.error("推送数据异常", e);
                throw new RuntimeException("推送数据异常");
            }
        }
    
        /**
         * 推送数据封装
         *
         * @param emitter sse长连接
         * @param data    发送数据
         */
        private void sseSend(List<SseEmitter> emitter, Object data) {
    
            emitter.forEach(e -> {
                try {
                    e.send(data);
                } catch (IOException ioException) {
                    log.error("推送数据异常", ioException);
                }
            });
            log.info("推送数据成功");
        }
    
    }
    

    实现效果如下:服务端不断推送数据到前端,前端可以也可以调用接口主动关闭连接。

    image-20240710180401231

适用场景:SSE由于是服务端单向通讯,所以适合那种需要单向持久的连接。比如:

  • ChatGPT这种实时加载会话数据
  • 文件下载,通过SSE异步下载文件
  • 服务端实时数据推送

原文地址:https://blog.csdn.net/pig_boss/article/details/140359530

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!