自学内容网 自学内容网

HTTP中的event-stream,eventsource,SSE,chatgpt,stream request,golang

我们都知道chatgpt是生成式的,因此它返回给客户端的消息也是一段一段的,所以普通的HTTP协议无法满足,当然websocket是能满足的,但是这个是双向的通信,其实 SSE(Server-Sent Events) 正好满足这个需求。

SSE相比websocket的优点:

  • SSE是使用http协议,而websocket是一种单独的协议。
  • SSE是单向传输,只能服务端向客户端推送,websocket是双向。
  • SSE支持断点续传,websocket需要自己实现。
  • SSE支持自动重连、轻量级。
  • SSE支持发送自定义类型消息。
  • SSE的响应头Content-Typ:text/event-stream

要实现SSE,服务端需要设置以下Headers

"Content-Type""text/event-stream"
"Cache-Control""no-cache"
"Connection""keep-alive"
"Access-Control-Allow-Origin": "*" // 跨域问题
一、前端代码

我看网络上有两种实现方式:fetch 和 EventSource

fetch方式

<!DOCTYPE html>
<html>

<head>
    <meta charset="UTF-8">
    <title>Event Stream Demo</title>
    <style type="text/css">
        body {
            font-family: Arial, sans-serif;
            text-align: center;
        }

        #event-stream-data {
            margin: 50px auto;
            max-width: 600px;
            border: 1px solid #ccc;
            padding: 10px;
        }
    </style>
</head>

<body>
    <div id="event-stream-data"></div>
</body>

<script>
    const eventStreamDataElement = document.getElementById('event-stream-data');
    function handleEventStreamMessage(event) {
        console.log(event)
        const eventText = event.data;
        displayEvent(eventText);
    }

    function displayEvent(eventText) {
        const eventElement = document.createElement('p');
        eventElement.textContent = eventText;
        eventStreamDataElement.appendChild(eventElement);
    }

    function connectToEventStream() {
        fetch('http://127.0.0.1:8080/stream', {
            method: 'POST',
            headers: {
                'Content-Type': 'application/x-www-form-urlencoded'
            },
            body: {
                data: 'example'
            }
        })
            .then(response => {
                const reader = response.body.getReader();
                const decoder = new TextDecoder();

                return reader.read().then(function processResult(result) {
                    // console.log(result)
                    if (result.done) {
                        return;
                    }

                    const chunk = decoder.decode(result.value, {
                        stream: true
                    });
                    handleEventStreamMessage({
                        data: chunk
                    });

                    return reader.read().then(processResult);
                });
            })
            .catch(error => {
                console.error('Error occurred while fetching event stream:', error);
            });
    }
    connectToEventStream();
</script>

</html>

EventSource方式

<!DOCTYPE html>
<html>

<head>
    <meta charset="UTF-8">
    <title>Event Stream Demo</title>
    <style type="text/css">
        body {
            font-family: Arial, sans-serif;
            text-align: center;
        }

        #event-stream-data {
            margin: 50px auto;
            max-width: 600px;
            border: 1px solid #ccc;
            padding: 10px;
        }
    </style>
</head>

<body>
    <div id="event-stream-data"></div>
</body>

<script type="text/javascript">
    const eventStreamDataElement = document.getElementById('event-stream-data');
    function handleEventStreamMessage(event) {
        console.log(event)
        const eventText = event.data;
        displayEvent(eventText);
    }

    function displayEvent(eventText) {
        const eventElement = document.createElement('p');
        eventElement.textContent = eventText;
        eventStreamDataElement.appendChild(eventElement);
    }

    // 向后端服务器发起sse请求
    const es = new EventSource("http://127.0.0.1:8080/stream");
    // Event 和 Message 分开处理,需要显示的监听事件,否则不会处理事件
    es.onmessage = function (e) {
        handleEventStreamMessage(e);
    }
    // 监听事件流
    es.addEventListener("start", (e) => {
        handleEventStreamMessage(e);
    });
    es.addEventListener("end", (e) => {
        handleEventStreamMessage(e);

        // 一定要关闭连接,否则会一直轮训
        es.close()
    });
    es.onerror = function (e) {
        // readyState说明
        // 0:浏览器与服务端尚未建立连接或连接已被关闭
        // 1:浏览器与服务端已成功连接,浏览器正在处理接收到的事件及数据
        // 2:浏览器与服务端建立连接失败,客户端不再继续建立与服务端之间的连接
        console.log("readyState = " + e.currentTarget.readyState);
    }
</script>

</html>
二、GIN 中自带的 SSE
package main

import (
"time"

"github.com/gin-contrib/sse"
"github.com/gin-gonic/gin"
)

func main() {
engin := gin.Default()

engin.Any("/stream", func(c *gin.Context) {
c.Header("Access-Control-Allow-Origin", "*")
c.Header("Access-Control-Allow-Headers", "*")

// c.SSEvent("start", "start...")
sse.Event{
Id:    "1",
Event: "start",
Data:  "start...",
}.Render(c.Writer)

c.Writer.Flush()
time.Sleep(1 * time.Second)

for i := 0; i < 10; i++ {
sse.Event{
Id:   "1",
Data: "SSE data",
}.Render(c.Writer)

c.Writer.Flush() // 需要手动刷新输出缓冲区

time.Sleep(1 * time.Second)
}

// c.SSEvent("end", "end...")
sse.Event{
Id:    "1",
Event: "end",
Data:  "end...",
}.Render(c.Writer)
})

engin.Run(":8080")
}

说明

// sse.Event
type Event struct {
Event string
Id    string
Retry uint
Data  interface{}
}

sse.Event 结构在渲染的时候会自动加上前缀和后面的回车,比如id:xxx\nevent:xxx\nretry:xxx\ndata:xxx\n\n,因此在设置内容的时候不需要关心format。

并且会自动填充两个响应头
Content-Type: text/event-stream
Cache-Control: no-cache

如果服务器端提供了event参数,那么客户端就需要使用addEventListener 显式监听这个事件,才会正常获取消息,否则事件不会触发。如果服务器端没有提供event 参数,只有id、data等,可以使用onmessage回调监听消息。

id 的意思是 lastEventId,用途不明。

完整的数据结构是:id:xxx\nevent:xxx\nretry:xxx\ndata:xxx\n\n

一般只需要data字段即可,后面接一个json串。

前端使用EventSource对象发起请求

在这里插入图片描述

在这里插入图片描述

使用 fetch 的方式发起请求,需要先打开调试并打开接口的响应预览tab,否则是看不到响应结果的。

在这里插入图片描述

在这里插入图片描述

使用EventSource对象发起请求与使用fetch方式的请求两者的区别在于,在处理响应结果的时候,前者是按照SSE协议来处理消息中的\n\n\n以及那几个字段;而后者则不会。下面是打印结果

EventSource示例

在这里插入图片描述

fetch示例

在这里插入图片描述

上面的实现仅仅是为了满足ChatGPT这种对话形式,或者说仅仅实现了一个长连接下的流式传输,即使不适用SSE也能实现。

如果想要实现真正的消息推送还需要对客户端连接进行管理,在这一块,SSE和websocket要做的事情差不多,这里就不展开了。

三、使用golang请求chatgpt

大部分的时候,在客户端和chatgpt之间还需要有一个代理层,即它代替用户向chatgpt发起请求,接收数据流,然后将数据流转发给用户。前面已经实现了SSE,所以,这里需要处理的是golang发起stream request。

package main

import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"log"
"net/http"
"strings"
"time"
)

func main() {
client := &http.Client{Timeout: time.Second * 20}
req, _ := http.NewRequest("POST", "http://127.0.0.1:8080/stream", strings.NewReader(""))

resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}

reader := bufio.NewReader(resp.Body)
defer resp.Body.Close()

for {
rawLine, err := reader.ReadBytes('\n')
if errors.Is(err, io.EOF) {
return
} else if err != nil {
fmt.Println(err)
return
}
fmt.Println(string(bytes.TrimRight(rawLine, "\n")))
}
}
id:1
event:start
data:start...

id:1
data:SSE data

id:1
data:SSE data

id:1
data:SSE data

id:1
data:SSE data

id:1
data:SSE data

id:1
data:SSE data

id:1
data:SSE data

id:1
data:SSE data

id:1
data:SSE data

id:1
data:SSE data

id:1
event:end
data:end...

golang对接openai:https://github.com/sashabaranov/go-openai


原文地址:https://blog.csdn.net/raoxiaoya/article/details/142375363

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!