自学内容网 自学内容网

BIO、NIO、Netty演化总结之二(手撸一个极简版netty)

之前的一片文章里面总结了一下IO模型的演进(BIO、NIO、Netty演化总结-CSDN博客),里面给了一个示例AsyncNonBlockingServerWithThreadPool,最近想了想,发现这个代码跟netty的模型还是有一些出入,说是netty的雏形好像有点牵强,于是想了一下,还是决定写一个更接近netty的极简版代码,仅供交流,有不对的地方欢迎指正,不喜勿喷,直接上代码

public class MyBossGroup {
    //多路复用器
    private Selector selector;
    private ServerSocketChannel serverChannel;
    //读写处理线程(对应netty里面的worker线程组)
    private MyWorkerGroup[] myWorkerGroups = new MyWorkerGroup[10];
    //计数器,用于从线程组中挑选一个线程来处理事件
    private final AtomicLong idx = new AtomicLong();

    public MyBossGroup(int port) throws IOException {
        // 创建选择器和服务器通道
        selector = Selector.open();
        serverChannel = ServerSocketChannel.open();
        serverChannel.bind(new InetSocketAddress(port));
        serverChannel.configureBlocking(false);
        // 注册服务器通道到选择器,并注册接收连接事件
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        for (int i = 0; i < myWorkerGroups.length; i++) {
            myWorkerGroups[i] = new MyWorkerGroup();
        }
    }

    public void start() throws IOException {
        System.out.println("Server started.");
        while (true) {
            // 阻塞等待事件发生
            selector.select();
            // 处理连接事件
            Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove();
                // 接收连接事件
                handleAccept(key);
            }
        }
    }

    private void handleAccept(SelectionKey key) throws IOException {
        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
        SocketChannel clientChannel = serverChannel.accept();
        clientChannel.configureBlocking(false);
        //挑选一个线程,将clientChannel绑定到这个线程中去
        MyWorkerGroup myWorkerGroup = myWorkerGroups[(int) Math.abs(idx.getAndIncrement() % myWorkerGroups.length)];
        //已经建立连接的socket交给worker线程组
        myWorkerGroup.register(clientChannel);
        System.out.println("New client connected: " + clientChannel.getRemoteAddress());
    }

    public static void main(String[] args) {
        try {
            MyBossGroup server = new MyBossGroup(8080);
            server.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

这个MyBossGroup就是我们在编写netty应用程序的时候的bossgroup的核心逻辑,负责接收客户端连接,并且将连接的socket注册到worker线程组中,下面的MyWorkerGroup就是编写netty应用程序的时候的workergroup的核心逻辑,负责数据的读写:

public class MyWorkerGroup {
    private Selector selector;
    private Thread thread;
    private ByteBuffer buffer;

    public MyWorkerGroup() {
        try {
            selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
        thread = new Thread(new MyRunnable());
        buffer = ByteBuffer.allocate(1024);
    }

    class MyRunnable implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    int select = selector.select();
                    if (0 == select) {
                        TimeUnit.MILLISECONDS.sleep(10);
                        continue;
                    }
                    // 处理连接事件
                    Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
                    while (keyIterator.hasNext()) {
                        SelectionKey key = keyIterator.next();
                        keyIterator.remove();
                        SocketChannel clientChannel = (SocketChannel) key.channel();
                        if (key.isReadable()) {
                            buffer.clear();
                            int bytesRead = 0;
                            try {
                                bytesRead = clientChannel.read(buffer);
                            } catch (IOException e) {
                                e.printStackTrace();
                                closeSocketChannel(key, clientChannel);
                                continue;
                            }
                            if (bytesRead == -1) {
                                closeSocketChannel(key, clientChannel);
                                try {
                                    System.out.println("Client disconnected: " + clientChannel.getRemoteAddress());
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                                continue;
                            }
                            buffer.flip();
                            byte[] data = new byte[buffer.remaining()];
                            buffer.get(data);
                            System.out.println("Received message from client: " + new String(data));
                            System.err.println("current_thread:" + Thread.currentThread().getName());
                        } else {
                            closeSocketChannel(key, clientChannel);
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        private void closeSocketChannel(SelectionKey key, SocketChannel socketChannel) {
            try {
                System.out.println("Client disconnected: " + socketChannel.getRemoteAddress());
                // 客户端关闭连接
                key.cancel();
                socketChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void register(SocketChannel socketChannel) {
        try {
            socketChannel.register(selector, SelectionKey.OP_READ);
        } catch (ClosedChannelException e) {
            e.printStackTrace();
        }
        thread.start();
    }
}

可以看到两个group其核心逻辑都是一个死循环,监听selector里面的事件,只是在netty里面将这两个死循环合并到了一个类里面,也就是NioEventLoop的run方法,每一个NioEventLoop独立维护一个自己的selector和任务队列(在这里没有体现),客户端连接第一次连接过来的socket绑定到一个NioEventLoop之后,后面这个socket的读写事件就全部由这个NioEventLoop负责,这样就有几个好处:

1、selector.select是一个阻塞的方法,由于每一个workergroup独立维护自己的selector,不会相互影响

2、每一个连接的客户端在绑定workergroup的时候实际上就是绑定了一个selector,这样每一个workergroup所负责管理的客户端连接的socket之间也相互不影响


原文地址:https://blog.csdn.net/qq_17805707/article/details/136150263

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!