自学内容网 自学内容网

简单的springboot 编写Socket服务接口

简单的springboot 编写Socket服务接口

1.需求

我们项目中有部分老接口为票据接口,其中实现为java socket形式进行实现,但是其中大部分信息都是原始公司封装的包进行实现的,想要修改非常费劲,所以此处简单了解了一下socket,自己简单的 编写了两个测试接口,方便以后如果需要自己添加socket接口,可以快速编写。

2. 简单实现

编写的接口为测试接口,整体结构相对简单,主要就是客户端发起一个请求,请求信息前6位为请求串长度,其余为请求的请求体,发送信息到服务端后,服务端使用线程池异步处理信息,最终返回处理之后的响应信息,客户端则接收响应信息,同样的步骤处理响应信息,前6位为响应信息长度,然后解析响应信息即可,因为为简单案例,所以没有进行数据通信加密。

2.1 客户端实现

客户端代码相对简单,直接写入到controller当中了,具体实现代码如下:

package cn.git.controller;

import cn.git.entity.Product;
import cn.git.socket.SocketUtil;
import com.alibaba.fastjson.JSONObject;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.math.BigDecimal;
import java.net.Socket;

/**
 * @description: Socket测试controller
 * @program: bank-credit-sy
 * @author: lixuchun
 * @create: 2023-03-20
 */
@RestController
@RequestMapping("/socket")
public class SocketController {

/**
 * 异步发送200个请求,模拟多用户
 */
    @GetMapping("/client")
    public String client() {
    // 异步发送200个请求,模拟多用户
        for (int i = 0; i < 200; i++) {
            int finalI = i;
            new Thread(()-> {
                try {
                    // 创建Socket对象
                    Socket socket = new Socket("localhost", 7777);
                    // 设置超时时间
                    socket.setSoTimeout(60000);

                    // 测试产品
                    Product product = new Product();
                    product.setAmount(new BigDecimal(finalI));
                    product.setCycle(12);
                    product.setEndTime("2018-08-08");
                    product.setName("test");
                    product.setRate(new BigDecimal(1));
                    product.setRaised(new BigDecimal(0));

                    // 拼接请求报文
                    String message = JSONObject.toJSONString(product);
                    String reqLengthStr = SocketUtil.leftFixedZero(6, message.length());

                    // 发送请求报文
                    PrintStream out = new PrintStream(socket.getOutputStream());
                    out.println(reqLengthStr.concat(message));

                    // 获取服务端返回的消息长度信息
                    BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                    char[] lengthByte = new char[6];
                    in.read(lengthByte);
                    String rspLengthStr = new String(lengthByte);
                    int responseLength = Integer.parseInt(rspLengthStr);

                    // 获取服务端返回的消息体信息
                    char[] responseByte = new char[responseLength];
                    in.read(responseByte);
                    String responseBody = new String(responseByte);

                    // 打印返回结果
                    System.out.println("返回结果为 : ".concat(responseBody));
                    socket.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }

        return "处理成功啦";
    }

}

2.2 服务端代码

服务端代码相对复杂一些,主要有socket服务初始化,公共线程池,工具类以及接口处理handle类。具体实现如下:

  • socket初始化类

    package cn.git.socket;
    
    import cn.git.mapper.ProductMapper;
    import cn.git.socket.handler.SocketHandler;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import java.io.IOException;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    /**
     * @description: socket接口入口信息
     * @program: bank-credit-sy
     * @author: lixuchun
     * @create: 2023-03-20
     */
    @Slf4j
    @Component
    public class CustomSocketServer {
    
        @Autowired
        private ProductMapper productMapper;
    
    /**
     * 初始化调用接口
     * 
     * 异步启动socket监听服务,端口 7777
     */
        @PostConstruct
        public void socketServerInit() throws IOException {
            new Thread(() -> {
                try {
                    // 监听7777端口
                    log.info("开始启动socket服务信息,端口监听 7777");
                    ServerSocket serverSocket = new ServerSocket(7777);
    
                    // 循环监听
                    while (true) {
                        log.info("等待客户端连接...");
                        Socket clientSocket = serverSocket.accept();
                        ThreadPoolUtil.THREAD_POOL.execute(
                            // 构建handler
                            SocketHandler.builder().clientSocket(clientSocket).productMapper(productMapper).build()
                        );
                        log.info("客户端连接成功,当前连接数:{}", ThreadPoolUtil.THREAD_POOL.getActiveCount());
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
    
    
  • 通用线程池相关类
    自定义线程池工厂实现如下

    package cn.git.socket;
    
    import cn.hutool.core.util.StrUtil;
    
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * 自定义线程池工厂
     * @program: bank-credit-sy
     * @author: lixuchun
     * @create: 2021-12-25
     */
    public class OnlineThreadFactory implements ThreadFactory {
    
        /**
         * 自增线程序号
         */
        private final AtomicInteger threadNumber = new AtomicInteger(1);
    
        /**
         * 线程名称前缀
         */
        private final String threadNamePrefix;
    
        /**
         * 构造方法
         * @param threadNamePrefix 方法前缀
         */
        public OnlineThreadFactory(String threadNamePrefix) {
            this.threadNamePrefix = threadNamePrefix.concat(StrUtil.DASHED);
        }
    
        /**
         * Constructs a new {@code Thread}.  Implementations may also initialize
         * priority, name, daemon status, {@code ThreadGroup}, etc.
         * @param runnable a runnable to be executed by new thread instance
         * @return constructed thread, or {@code null} if the request to
         * create a thread is rejected
         */
        @Override
        public Thread newThread(Runnable runnable) {
            // 设置线程池名称
            Thread thread = new Thread(runnable , threadNamePrefix.concat(StrUtil.toString(threadNumber.getAndIncrement())));
    
            // 设置守护线程
            if (thread.isDaemon()) {
                thread.setDaemon(false);
            }
    
            // 同意设置程默认优先级 5
            if (thread.getPriority() != Thread.NORM_PRIORITY) {
                thread.setPriority(Thread.NORM_PRIORITY);
            }
            return thread;
        }
    }
    
    

    线程池工具类

    package cn.git.socket;
    
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @description: 线程池工具类
     * @program: bank-credit-sy
     * @author: lixuchun
     * @create: 2022-08-16 10:58:07
     */
    public class ThreadPoolUtil {
    
        /**
         * 线程池线程名称
         */
        private static final String DICS_THREAD_POOL_PREFIX = "DICS-SOCKET";
    
        /**
         * 超时时间 单位毫秒
         */
        private static final int REQ_TIME_OUT = 10 * 1000;
    
        /**
         * 阻塞队列大小
         */
        private static final int QUEUE_SIZE = 200;
    
        /**
         * 核心线程池数量
         */
        private static final int CORE_THREAD_NUM = 5;
    
        /**
         * 最大线程池数量
         */
        private static final int MAX_THREAD_NUM = 20;
    
        /**
         * 线程池构造参数
         */
        public static ThreadPoolExecutor THREAD_POOL = new ThreadPoolExecutor(CORE_THREAD_NUM,
                MAX_THREAD_NUM,
                REQ_TIME_OUT,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(QUEUE_SIZE),
                new OnlineThreadFactory(DICS_THREAD_POOL_PREFIX));
    }
    
    
  • 业务处理handle类

    package cn.git.socket.handler;
    
    import cn.git.entity.Product;
    import cn.git.mapper.ProductMapper;
    import cn.git.socket.SocketUtil;
    import cn.hutool.core.util.IdUtil;
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import lombok.*;
    
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    import java.io.PrintWriter;
    import java.net.Socket;
    
    /**
     * @description: socket请求处理类
     * @program: bank-credit-sy
     * @author: lixuchun
     * @create: 2023-03-20
     */
    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    public class SocketHandler implements Runnable {
    
        /**
         * 订单信息mapper
         */
        private ProductMapper productMapper;
    
        /**
         * 客户端socket
         */
        private Socket clientSocket;
    
        /**
         * When an object implementing interface <code>Runnable</code> is used
         * to create a thread, starting the thread causes the object's
         * <code>run</code> method to be called in that separately executing
         * thread.
         * <p>
         * The general contract of the method <code>run</code> is that it may
         * take any action whatsoever.
         *
         * @see Thread#run()
         */
        @SneakyThrows
        @Override
        public void run() {
            // 获取请求数据信息
            System.out.println("接收数据开始处理!");
            BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
            PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
    
            // 读取数据前6位,获取请求数据长度
            char[] bodyBytes = new char[6];
            in.read(bodyBytes);
            String dataLengthStr = new String(bodyBytes);
    
            // 获取请求数据信息
            Integer dataLength = Integer.parseInt(dataLengthStr);
            System.out.println("请求数据长度:" + dataLength);
    
            bodyBytes = new char[dataLength];
            in.read(bodyBytes);
            String requestBodyInfo = new String(bodyBytes);
            System.out.println("请求数据:" + requestBodyInfo);
    
            // 请求数据转换为Person对象
            Product product = JSON.parseObject(requestBodyInfo, Product.class);
            product.setId(IdUtil.simpleUUID());
            productMapper.insert(product);
    
            // 响应数据
            String rspJSONInfo = JSONObject.toJSONString(product);
    
            // 响应数据长度标识位 eg: 000667
            String prefixLength = SocketUtil.leftFixedZero(6, rspJSONInfo.length());
    
            // 最终响应数据
            String finalRspInfo = prefixLength.concat(rspJSONInfo);
            System.out.println("响应数据:" + finalRspInfo);
            out.println(finalRspInfo);
        }
    }
    
    
  • socket工具类

    package cn.git.socket;
    
    /**
     * @description: socket工具类
     * @program: bank-credit-sy
     * @author: lixuchun
     * @create: 2023-03-20
     */
    public class SocketUtil {
    
        /**
         * 左补0
         * eg: length = 6, num = 123, return 000123
         *
         * @param length 长度
         * @param num    数字
         * @return
         */
        public static String leftFixedZero(int length, int num) {
            return String.format("%0" + length + "d", num);
        }
    
    }
    
    

3.测试

启动服务,观察socket监听端口 7777 是否正常启动监听,观察如下,socket服务端正常启动监听端口
在这里插入图片描述
开始模拟多客户端调用,请求 http://localhost:8088/socket/client 接口,循环异步发起 200 socket 请求。
在这里插入图片描述
观察后台信息
在这里插入图片描述
观察数据库,发现数据已经正确导入了, 成功插入了 200 条数据信息
在这里插入图片描述


原文地址:https://blog.csdn.net/qq_19342829/article/details/142593138

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!