自学内容网 自学内容网

netty之关闭连接源码分析

写在前面

本文看下netty关闭channel相关源码。

1:前置准备

为了测试,我们需要使用netty源码中examples模块的echoserver和echoclient,但是echoclient因为会不断的发送消息,并不会断开连接,所以,我们需要修改为如下:

package io.netty.example.echo;

public final class EchoClientForDebugClose {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.git
        final SslContext sslCtx;
        if (SSL) {
            sslCtx = SslContextBuilder.forClient()
                .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } else {
            sslCtx = null;
        }

        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
//                     p.addLast(new EchoClientHandler()); 因为要debug通道close时server端的逻辑,所以这里直接注释
                 }
             });

            // Start the client.
            ChannelFuture f = b.connect(HOST, PORT).sync();

            // Wait until the connection is closed.
//            f.channel().closeFuture().sync(); 因为要debug通道close时server端的逻辑,所以这里直接注释
        } finally {
            // Shut down the event loop to terminate all threads.
            group.shutdownGracefully();
        }
    }
}

其中代码group.shutdownGracefully();就是客户端关闭通道的代码。当该代码执行后会触发server端的op_read事件,只不过此时读取到的数据量是-1,代表正常的EOF。接着就可以正式开始debug源码了。

2:正戏

首先在server端代码group.shutdownGracefully();和服务端代码io.netty.channel.nio.NioEventLoop#run打断点,首先启动server,此时我们先mute server端代码,接着启动client端代码, client端代码进入debug:
在这里插入图片描述
接着取消server端代码断点mute,就可以开始调试了:
在这里插入图片描述
执行到代码:

// io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    i// ...

    try {
        // ...

        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop op_accept接收连接事件,op_read读数据事件的话为true,进if执行
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read(); // 读数据
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

继续:

// io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
public final void read() {
    final ChannelConfig config = config();
    if (shouldBreakReadReady(config)) {
        clearReadPending();
        return;
    }
    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator(); // 这是一个自适应大小的分配器
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); // 用来动态调整buf的实际大小,是一个guess的过程,连续读的多就变大,反之变小
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            // ...
            // doReadBytes是真的读数据,lastBytesRead记录读到的总字节数以及当次读到的字节数,以作为动态调整buf大小的依据
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            if (allocHandle.lastBytesRead() <= 0) { // 读到的数据小于0,则说明是客户端关闭连接
                // nothing was read. release the buffer. 数据清理,释放堆外内存
                byteBuf.release();
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;
                if (close) {
                    // There is nothing left to read as we received an EOF.
                    readPending = false;
                }
                break;
            }
            // ...
        } while (allocHandle.continueReading()); // allocHandle.continueReading() 代表在满足特定条件下会尝试多次重复读取数据
        // ...

        if (close) { // 关闭逻辑处理
            closeOnRead(pipeline);
        }
    } catch (Throwable t) { // 异常关闭,同样是
        // ...
    } finally {
        // ...
    }
}

方法doReadBytes(byteBuf)当返回-1,代表EOF了,即客户端正常关闭了channel,所以接下来的if就为true了,内部执行一些资源释放的工作。主要看方法closeOnRead(pipeline);

// io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#closeOnRead
private void closeOnRead(ChannelPipeline pipeline) {
    if (!isInputShutdown0()) {
        if (isAllowHalfClosure(config())) { // 半开,忽略
            shutdownInput();
            pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
        } else {
            close(voidPromise()); // 进一步执行关闭
        }
    } else {
        inputClosedSeenErrorOnRead = true;
        pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
    }
}

看方法close(voidPromise());:

// io.netty.channel.AbstractChannel.AbstractUnsafe#close(io.netty.channel.ChannelPromise, java.lang.Throwable, java.nio.channels.ClosedChannelException, boolean)
private void close(final ChannelPromise promise, final Throwable cause,
                   final ClosedChannelException closeCause, final boolean notify) {
    // ...
    if (closeExecutor != null) {
        // ...
    } else {
        try {
            // Close the channel and fail the queued messages in all cases.
            doClose0(promise); // do了,真的去关了
        } finally {
            // ...
        }
        if (inFlush0) {
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    fireChannelInactiveAndDeregister(wasActive);
                }
            });
        } else {
            fireChannelInactiveAndDeregister(wasActive); // 触发事件了,通道变为不激活,以及取消事件注册
        }
    }
}

方法doClose0(promise);,又do了,看到do就有点莫名兴奋,有没有?:

// io.netty.channel.socket.nio.NioSocketChannel#doClose
protected void doClose() throws Exception {
    super.doClose();
    javaChannel().close(); // 本质了执行Java NIO channel的close方法,完成通道关闭
}

javaChannel().close();就已经是JavaNIO的代码了,内部会最终执行如下位置,关闭通道的同时也会取消selection key事件的注册:

// java.nio.channels.spi.AbstractSelectableChannel#implCloseChannel
protected final void implCloseChannel() throws IOException {
    implCloseSelectableChannel();
    synchronized (keyLock) {
        int count = (keys == null) ? 0 : keys.length;
        for (int i = 0; i < count; i++) {
            SelectionKey k = keys[i];
            if (k != null)
                k.cancel();
        }
    }
}

写在后面

参考文章列表

netty之导入源码到idea


原文地址:https://blog.csdn.net/wang0907/article/details/143687372

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!