基于Socket实现Http类SSE效果
- SocketServer
public class TomcatServer {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(8080);
while (true) {
Socket socket = serverSocket.accept();
new Thread(() -> {
try {
InputStream is = socket.getInputStream();
OutputStream os = socket.getOutputStream();
while (true) {
//创建一个字节集合
List<Byte> contentBytes = new ArrayList<Byte>();
//先读取第一行
while (true) {
int byteData = is.read();
contentBytes.add((byte) byteData);
// System.out.println("byteToStringStr" + byteToStringStr(contentBytes));
if (byteData == 10 && (countBytes(contentBytes, (byte) 10) == 1)) {
//读取到了换行符
System.out.println("Http请求行:" + byteToString(contentBytes));
}
//继续判断是否是以 \r\n\r\n 结束,表示请求头结束了
if (endWithHeader(contentBytes)){
System.out.println("完整的Http协议请求: \n" + byteToString(contentBytes));
break;
}
}
writeSse(os);
//os.write(("已经到请求,共计字节数:" + contentBytes.size() + "\r\n").getBytes(StandardCharsets.UTF_8) );
//os.flush();
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
public static Integer countBytes(List<Byte> byteList,byte target) {
Map<Byte, Integer> countMap = new HashMap<>();
for (Byte b : byteList) {
if (countMap.containsKey(b)) {
countMap.put(b, countMap.get(b) + 1);
} else {
countMap.put(b, 1);
}
}
return countMap.get((byte)target) == null ? 0 : countMap.get(target);
}
private static boolean endWithHeader(List<Byte> contentBytes) {
if (contentBytes.size() < 4) {
return false;
}
Byte last1 = contentBytes.get(contentBytes.size() - 1);
Byte last2 = contentBytes.get(contentBytes.size() - 2);
Byte last3 = contentBytes.get(contentBytes.size() - 3);
Byte last4 = contentBytes.get(contentBytes.size() - 4);
if (last1 == '\n' && last2 == '\r' && last3 == '\n' && last4 == '\r') {
return true;
}
return false;
}
private static String byteToStringStr(List<Byte> contentBytes) {
return Arrays.toString(contentBytes.toArray());
}
private static String byteToString(List<Byte> contentBytes) {
byte[] byteArray = new byte[contentBytes.size()];
for (int i = 0; i < contentBytes.size(); i++) {
byteArray[i] = contentBytes.get(i);
}
String str = new String(byteArray);
return str;
}
private static void writeSse(OutputStream os) throws Exception{
String serverJsonContent = "{\n" +
" \"code\": 2000,\n" +
" \"data\": {\n" +
" \"url\": \"https://tc-cdn.flowus.cn/oss/54b87779-e9c8-434b-99a4-122a7fb61f04/751f62a5-840a-4f98-8ab9-9a4529ade142/%E4%BA%B2%E5%AD%90%E7%BB%98%E6%9C%AC.jpeg?time=1736474400&token=68b869c7f7649f7fd5998810ef0927fd3332396e50186f49bad4b76c166c16c7&role=free\",\n" +
" \"ttl\": 5400\n" +
" }\n" +
"}";
//转为字节数组
byte[] contentBytes = serverJsonContent.getBytes(StandardCharsets.UTF_8);
//每次发送16个字节
int itemSendByteCount = 16;
//总字节
int totalByteCount = contentBytes.length;
//已发送字节数
int senDedByteCount = 0;
while (true){
if ( (totalByteCount - senDedByteCount) <= itemSendByteCount ){
JSONObject dataPacket = getCurrentDataPacket(senDedByteCount,itemSendByteCount,contentBytes);
os.write(dataPacket.toString().getBytes(StandardCharsets.UTF_8));
os.write('\r');
os.write('\n');
os.flush();
System.out.println("发送最后一次数据包:===》 " + dataPacket);
break;
}
//获取本次发送的数据包
JSONObject dataPacket = getCurrentDataPacket(senDedByteCount,itemSendByteCount,contentBytes);
senDedByteCount = senDedByteCount + itemSendByteCount;
os.write(dataPacket.toString().getBytes(StandardCharsets.UTF_8));
os.write('\r');
os.write('\n');
os.flush();
Thread.sleep(200);
System.out.println("发送数据包:===》 " + dataPacket);
}
//发送一个0字节的数据包
JSONObject endDataPacket = getSseEndDataPacket();
os.write(endDataPacket.toString().getBytes(StandardCharsets.UTF_8));
os.write('\r');
os.write('\n');
os.flush();
System.out.println("发送结束数据包:===》 " + endDataPacket);
}
private static JSONObject getSseEndDataPacket() {
JSONObject dataPacket = new JSONObject();
dataPacket.put("length",0);
return dataPacket;
}
private static JSONObject getCurrentDataPacket(int senDedByteCount, int itemSendByteCount, byte[] contentBytes) {
JSONObject dataPacket = new JSONObject();
try {
int endIndex = senDedByteCount + itemSendByteCount;
if (endIndex > contentBytes.length - 1){
endIndex = contentBytes.length;
}
int sendCount = endIndex - senDedByteCount;
dataPacket.put("length",sendCount);
byte[] bytes = Arrays.copyOfRange(contentBytes, senDedByteCount, endIndex);
dataPacket.put("data",bytes);
}catch (Exception e){
e.printStackTrace();
}
return dataPacket;
}
}
- Client
import com.alibaba.fastjson.JSONObject;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Scanner;
public class Client {
public static void main(String[] args) throws Exception {
Socket socket = new Socket("127.0.0.1",8080);
InputStream is = socket.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is));
OutputStream os = socket.getOutputStream();
Scanner sc = new Scanner(System.in);
while(true){
System.out.print("请输入Http请求行,回车键结束 : ");
String httpLine = sc.nextLine();
httpLine = httpLine + "\r\n";
os.write(httpLine.getBytes());
os.flush();
System.out.print("请输入Http请求头,回车键结束 : ");
String httpHeader = sc.nextLine();
httpHeader = httpHeader + "\r\n\r\n";
os.write(httpHeader.getBytes());
os.flush();
//String serverResponse = br.readLine();
//System.out.println("服务器响应:" + serverResponse);
doSseReceive(br);
}
}
private static String byteToString(List<Byte> contentBytes) {
byte[] byteArray = new byte[contentBytes.size()];
for (int i = 0; i < contentBytes.size(); i++) {
byteArray[i] = contentBytes.get(i);
}
String str = new String(byteArray);
return str;
}
public static void doSseReceive(BufferedReader br) throws Exception{
List<Byte> sseByteContent = new ArrayList<>();
while (true){
//以读取到换行符为结尾
String dataPacket = br.readLine();
//按照约定、以Json形式解析数据包
JSONObject jsonObject = JSONObject.parseObject(dataPacket);
System.out.println("本次接收到的SSE数据包:" + jsonObject.toJSONString());
//解析length字段
Integer length = jsonObject.getInteger("length");
if (length == 0){
break;
}
byte[] data = jsonObject.getBytes("data");
for (int i = 0; i < data.length; i++) {
sseByteContent.add(data[i]);
}
}
//输出最终结果
System.out.println("SSE响应完整结果:" + byteToString(sseByteContent));
}
}
- SSE效果模拟实现
- 客户端输出效果
2. 服务器输出效果
客户端通过两次接收控制台输入后,发送给了服务器,服务器接收到以后将准备好的数据进行分段发送,每一段发送16个字节,并封装为一个json数据,当发送数据的长度为0时,表示服务器本次发送的数据完成,客户端可以解析了。
客户端解析到最后一个数据包,将所有的字节数据统一转为Json字符串,此时就可以得到服务器发送的原始的完整字节数据了。
- 遵循的协议
在客户端与服务器双方实现通信时,遵循同一种协议是非常重要的,客户端需要知道服务器什么时候发送数据完成,同理,服务器也需要知道客户端发送数据完成的标志,例如在上一个sse案例中,客户端以 \r\n\r\n为发送结束的标志,同理,服务器同样判断接收的最后四个字节数据是否为\r\n\r\n为结束标志,服务器在处理返回时,遵循的输出规范时向客户端每次发送一个json格式的字符串,同时以换行符输出为一个响应的结束,最后一个响应结束为发送0个字节的json字符串,而同样的客户端每次读取也是以换行符为一个读取,知道读到最后一个数据包,再去解析本次的完整数据。
- 通信协议思想
不同的应用软件,例如Mysql、kafka、nacos等在与各种程序或者自己的集群交互时,都制定了类似于上述自定义实现的sse协议一样,双方约定好一次请求与响应的数据格式 开始和结束的标志。那么各个应用程序时间都可以基于Socket实现最底层的数据传输通信。
-
输入输出流的缓冲区
对于服务端或者客户端的输出流来说,其实输出流输出的数据是经过网络协议栈处理后,操作系统调度到了对方的输入流的缓冲队列中,每调用一次read方法,就会读取一个缓冲队列里的字节,如果不读取,则缓冲队列里的字节是一直存在的。输入字节流的缓冲区机制很好的保证了通信双方一直读取到约定好的标志位之后才会认为本次传输的数据是完整的。 -
重点的几个关键词
Socket输入输出端、绑定进程、双方通信协议、标志结束位、输入流缓冲区。
原文地址:https://blog.csdn.net/qq_43750656/article/details/145062646
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!