自学内容网 自学内容网

Netty学习——源码篇2 客户端Bootstrap(一)

1 Channel简介

        在Netty中,Channel相当于一个Socket的抽象,它为用户提供了关于Socket状态(是连接还是断开)以及对Socket的读写等操作。每当Netty建立了一个连接,都创建一个与其对应的Channel实例。

        除了TCP,Netty还支持很多其他的协议,并且每种协议还有NIO和OIO(传统的阻塞IO)版本的区别。不同协议不同阻塞类型的连接都有不同的Channel类型与之对应,下表对一些常用的Channel做了简单介绍。

 

        来看一下Channel的总体类图,如下图:

 

2 NioSocketChannel的创建

        Bootstrap是Netty提供的一个便利的工厂类,可以通过它来完成客户端或者服务端的Netty的初始化。

        首先,从客户端开始分析。 

public class ChatClient {
    public ChatClient connect(int port,String host,final String name){
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE,true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            System.out.println("初始化channel:" + socketChannel);
                        }
                    });
            //发起同步连接操作
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            channelFuture.channel().closeFuture().sync();
        }catch (InterruptedException e){
            e.printStackTrace();
        }finally {
            //关闭,释放线程资源
            group.shutdownGracefully();
        }
        return this;

    }

    public static void main(String[] args) {
        new ChatClient().connect(8080,"127.0.0.1","jay");
    }
}

         分析如下:

        1、EventLoopGroup:不论是服务端还是客户端,都必须指定EventLoopGroup。在本例中,指定了NioEventLoopGroup,表示一个NIO的EventLoopGroup。

        2、ChannelType:指定Channel的类型。因为是客户端,所以使用了NioSocketChannel。

        3、Handler:设置处理数据的Handler。

        客户端启动Bootstrap后都做了哪些工作?看一下NioSocketChannel的类层次结构图,如下:

        回到在客户端连接代码的初始化Bootstrap中,该方法调用了一个channel方法,传入的参数是NioSocketChannel.class,在这个方法中其实就是初始化了一个ReflectiveChannelFactory的对象,代码实现如下:

public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        } else {
            return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory(channelClass)));
        }
    }

         而ReflectiveChannelFactory实现了ChannelFactory接口,它提供了唯一的方法,即newChannel方法。顾名思义,ChannelFactory就是创建Channel的工厂类。进入ReflectiveChannelFactory的newChannel方法,其实现代码如下:

public T newChannel() {
        try {
            return (Channel)this.clazz.newInstance();
        } catch (Throwable var2) {
            throw new ChannelException("Unable to create Channel from class " + this.clazz, var2);
        }
    }

        根据上面的代码,可以得出以下结论。

        1、Bootstrap中的ChannelFactory实现类是ReflectiveChannelFactory。

        2、通过channel方法创建的Channel具体类型是NioSocketChannel。

        Channel的实例化过程其实就是调用ChannelFactory的newChannel方法,而实例化的Channel具体类型又和初始化Bootstrap时传入的channel方法的参数有关。因此对于客户端的Bootstrap而言,创建的Channel实例就是NioSocketChannel。

3 客户端Channel初始化

        上面提到了如何设置一个Channel的类型,并且了解到Channel是通过ChannelFactory的newChannel方法来实例化的,那么ChannelFactory的newChannel方法在哪里调用呢?其调用链路如下图:

        在AbstractBootstrap的initAndRegister方法中,调用ChannelFactory的newChannel方法来创建一个NioSocketChannel的实例,代码如下:

final ChannelFuture initAndRegister() {
        Channel channel = null;

        try {
            channel = this.channelFactory.newChannel();
            this.init(channel);
        } catch (Throwable var3) {
            if (channel != null) {
                channel.unsafe().closeForcibly();
            }

            return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
        }

        ChannelFuture regFuture = this.config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        return regFuture;
    }

        在newChannel方法中,利用反射机制调用类对象newInstance()方法来创建一个新的Channel实例,相当于调用NioSocketChannel的默认构造方法。 NioSocketChannel默认的构造方法代码如下:

public NioSocketChannel() {
        this(DEFAULT_SELECTOR_PROVIDER);
    }

        这里的代码比较关键,可以看到,在这个构造器中首先会调用newSocket()方法来打开一个新的Java NIO的SocketChannel对象。

private static java.nio.channels.SocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openSocketChannel();
        } catch (IOException var2) {
            throw new ChannelException("Failed to open a socket.", var2);
        }
    }

        然后调用父类,即AbstractNioByteChannel构造器.

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, 1);
    }

        同时,传入参数,parent的值默认为null,ch为之前调用newSocket()方法创建的Java NIO的SocketChannel对象,因此新创建的NioSocketChannel对象中的parent暂时是null。接着会调用父类的AbstractNioChannel构造器,并传入实际参数readInterestOp=SelectionKey.OP_READ。

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;

        try {
            ch.configureBlocking(false);
        } catch (IOException var7) {
            try {
                ch.close();
            } catch (IOException var6) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to close a partially initialized socket.", var6);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", var7);
        }
    }

        最后会调用父类AbstractNioChannel的构造器。

        至此,NioSocketChannel就完成了初始化,总结一下NioSocketChannel初始化所做的流程:

        1、调用NioSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER)打开一个新的Java NioSocketChannel。

        2、初始化AbstractChannel对象并给属性赋值,具体赋值的属性如下:

                (1)id:每个Channel都会被分配一个唯一的id。

                (2)parent:属性值默认为null。

                (3)unsafe:通过调用newUnsafe()方法实例化一个Unsafe对象,它的类型是AbsractNioByteChannel.NioByteUnsafe内部类。

                (4)pipeline:是通过调用new DefaultChannelPipeline(this)新创建的实例。

        3、AbstractNIOChannel中被赋值的属性如下:

                (1)ch:被赋值为Java 原生SocketChannel,即NioSocketChannel的newSocket()方法返回的Java NIO SocketChannel。

                (2)readInterestOp:被赋值为SelectionKey.OP_READ。

                (3)ch:被配置为非阻塞,即调用ch.configureBlocking(false)方法

        4、NioSocketChannel中被赋值的属性:config = new NioSocketChannelConfig(this,socket.socket())。

4 ChannelPipeline的初始化

        上面在分析NioSocketChannel的初始化过程中,漏掉了一个关键的部分,即ChannelPipeline的初始化。在实例化一个Channel时,必须要实例化一个ChannelPipeline。在AbstractChannel的构造器中看到了Pipeline属性被初始化为DefaultChannelPipeline的实例。DefaultChannelPipeline构造器的代码如下:

protected DefaultChannelPipeline(Channel channel) {
        this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
        this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
        this.voidPromise = new VoidChannelPromise(channel, true);
        this.tail = new DefaultChannelPipeline.TailContext(this);
        this.head = new DefaultChannelPipeline.HeadContext(this);
        this.head.next = this.tail;
        this.tail.prev = this.head;
    }

        DefaultChannelPipeline 的构造器需要传入一个Channel,而这个Channel其实就是实例化的NioSocketChannel对象,DefaultChannelPipeline会将这个  NioSocketChannel对象保存在Channel属性中。DefaultChannelPipeline中还有两个属性是双向链表的头和尾,即Head和Tail。其实在DefaultChannelPipeline中维护了一个以AbstractChannelHandlerContext为节点元素的双向链表,这个链表是NEtty实现Pipeline机制的关键。先看HeadContext的继承层次结构,如下图所示:

        TailContext的继承层次结构图如下:

         可以看到,链表中Head是一个ChannelOutBoundHandler,而Tail是一个ChannelInBoundHandler。接着看HeadContext的构造器代码:

HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, (EventExecutor)null, DefaultChannelPipeline.HEAD_NAME, false, true);
            this.unsafe = pipeline.channel().unsafe();
            this.setAddComplete();
        }

        它调用了父类AbstractChannelHandlerContext的构造器,并传入擦承诺书inbound=false,outbound=true。而TailContext的构造器与HeadContext刚好相反。

5 EventLoop的初始化

        回到最开始ChatClient用户代码中,一开始就实例化了一个NioEventLoopGrouop的对象,因此就从它的构造器中追踪EventLoop的初始化过程。首先来看NioEventLoopGrouop的类继承层次结构图:

        NioEventLoop中有几个重载的构造器,不过内容都没有太大的区别,最终都调用父类MultithreadEventLoopGroup的构造器,代码如下:

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

        如果传入的线程数nThreads是0,那么Netty会设置默认的线程数DEFAULT_EVENT_LOOP_THREADS,而这个默认的线程数怎么确定的呢?首先确定 DEFAULT_EVENT_LOOP_THREADS的值

private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));

        Netty首先从系统属性中获取“io.netty.eventLoopThreads”的值,如果没有设置,就返回默认值,即CPU核数*2。回到MultithreadEventLoopGroup构造器中会继续调用父类MultithreadEventExecutorGroup的构造器。

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
        this.terminatedChildren = new AtomicInteger();
        this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        } else {
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());
            }

            this.children = new EventExecutor[nThreads];

            int j;
            for(int i = 0; i < nThreads; ++i) {
                boolean success = false;
                boolean var18 = false;

                try {
                    var18 = true;
                    this.children[i] = this.newChild((Executor)executor, args);
                    success = true;
                    var18 = false;
                } catch (Exception var19) {
                    throw new IllegalStateException("failed to create a child event loop", var19);
                } finally {
                    if (var18) {
                        if (!success) {
                            int j;
                            for(j = 0; j < i; ++j) {
                                this.children[j].shutdownGracefully();
                            }

                            for(j = 0; j < i; ++j) {
                                EventExecutor e = this.children[j];

                                try {
                                    while(!e.isTerminated()) {
                                        e.awaitTermination(2147483647L, TimeUnit.SECONDS);
                                    }
                                } catch (InterruptedException var20) {
                                    Thread.currentThread().interrupt();
                                    break;
                                }
                            }
                        }

                    }
                }

                if (!success) {
                    for(j = 0; j < i; ++j) {
                        this.children[j].shutdownGracefully();
                    }

                    for(j = 0; j < i; ++j) {
                        EventExecutor e = this.children[j];

                        try {
                            while(!e.isTerminated()) {
                                e.awaitTermination(2147483647L, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException var22) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }

            this.chooser = chooserFactory.newChooser(this.children);
            FutureListener<Object> terminationListener = new FutureListener<Object>() {
                public void operationComplete(Future<Object> future) throws Exception {
                    if (MultithreadEventExecutorGroup.this.terminatedChildren.incrementAndGet() == MultithreadEventExecutorGroup.this.children.length) {
                        MultithreadEventExecutorGroup.this.terminationFuture.setSuccess((Object)null);
                    }

                }
            };
            EventExecutor[] arr$ = this.children;
            j = arr$.length;

            for(int i$ = 0; i$ < j; ++i$) {
                EventExecutor e = arr$[i$];
                e.terminationFuture().addListener(terminationListener);
            }

            Set<EventExecutor> childrenSet = new LinkedHashSet(this.children.length);
            Collections.addAll(childrenSet, this.children);
            this.readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }
    }

        继续进入newChooser方法查看实现逻辑,代码如下DefaultEventExecutorChooserFactory:

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() {
    }

    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        return (EventExecutorChooser)(isPowerOfTwo(executors.length) ? new DefaultEventExecutorChooserFactory.PowerOfTowEventExecutorChooser(executors) : new DefaultEventExecutorChooserFactory.GenericEventExecutorChooser(executors));
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        public EventExecutor next() {
            return this.executors[Math.abs(this.idx.getAndIncrement() % this.executors.length)];
        }
    }

    private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        public EventExecutor next() {
            return this.executors[this.idx.getAndIncrement() & this.executors.length - 1];
        }
    }
}

        上面代码主要表达的意思是:如果nThreads是2的平方,则使用PowerOfTowEventExecutorChooser,否则使用GenericEventExecutorChooser。这两个Chooser都重写next()方法。next()方法的主要功能就是将数组索引循环位移,如下图所示:

        当索引移动到最后一个位置时,再调用next方法就会将索引位置重新指向0,如下图所示:

        分析到这里,已经非常清楚 MultithreadEventLoopGroup中的处理逻辑,简单总结如下:

        1、创建一个大小为nThreads的SingleThreadEventExecutor数组。

        2、根据nThreads的大小,创建不同的Chooser,如果nThreads是2的平方,则使用PowerOfTowEventExecutorChooser,否则使用GenericEventExecutorChooser。不论使用哪个Chooser,它们的功能都是一样的,即从children数组中选出一个合适的EventExecutor实例。

        3、调用newChild()方法初始化children数组。

        根据上面代码,知道了MultithreadEventLoopGroup内部维护了一个EventExecutor数组,而Netty的EventLoopGroup的实现机制其实就建立在MultithreadEventLoopGroup之上。每当Netty需要一个EventLoop时,都会调用next方法获取一个可用的EventLoop。

6 将Channel注册到Selector

        前面提到Channel会在Bootstrap的initAndRegister中进行初始化,但是这个方法还会将初始化好的channel注册到NioEventLoop的Selector中。

        当Channel初始化后,紧接着会调用group().register()方法来向Selector注册Channel。继续跟踪,会发现其调用链路如下图所示:

        通过跟踪链路,最终发现在AbstractBootstrap的initAndRegister方法中调用的是Unsafe的register方法,接下来看一下AbstractChannel代码:

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            } else if (AbstractChannel.this.isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
            } else if (!AbstractChannel.this.isCompatible(eventLoop)) {
                promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
            } else {
                AbstractChannel.this.eventLoop = eventLoop;
                if (eventLoop.inEventLoop()) {
                    this.register0(promise);
                } else {
                    try {
                        eventLoop.execute(new Runnable() {
                            public void run() {
                                AbstractUnsafe.this.register0(promise);
                            }
                        });
                    } catch (Throwable var4) {
                        AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);
                        this.closeForcibly();
                        AbstractChannel.this.closeFuture.setClosed();
                        this.safeSetFailure(promise, var4);
                    }
                }

            }
        }

         首先,将EventLoop赋值给Channel的eventLoop属性,我们直到EventLoop对象其实是通过MultithreadEventLoopGroup的next方法获取的,根据前面的分析,可以确定next方法返回的eventLoop对象是NioEventLoop实例。register方法接着调用了register0方法,代码如下:

private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !this.ensureOpen(promise)) {
                    return;
                }

                boolean firstRegistration = this.neverRegistered;
                AbstractChannel.this.doRegister();
                this.neverRegistered = false;
                AbstractChannel.this.registered = true;
                AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();
                this.safeSetSuccess(promise);
                AbstractChannel.this.pipeline.fireChannelRegistered();
                if (AbstractChannel.this.isActive()) {
                    if (firstRegistration) {
                        AbstractChannel.this.pipeline.fireChannelActive();
                    } else if (AbstractChannel.this.config().isAutoRead()) {
                        this.beginRead();
                    }
                }
            } catch (Throwable var3) {
                this.closeForcibly();
                AbstractChannel.this.closeFuture.setClosed();
                this.safeSetFailure(promise, var3);
            }

        }

        register0方法又调用了AbsractNioChannel的doRegister方法,代码如下:

protected void doRegister() throws Exception {
        boolean selected = false;

        while(true) {
            try {
                this.selectionKey = this.javaChannel().register(this.eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException var3) {
                if (selected) {
                    throw var3;
                }

                this.eventLoop().selectNow();
                selected = true;
            }
        }
    }

        看到javaChannel()这个方法,我们在前面就知道,它返回的是一个java NIO的SocketChannel对象。到了这里,就将SocketChannel注册到与eventLoop关联的Selector上了。

        总结一下Channel的注册过程,具体如下:

        1、在AbstractBootstrap的initAndRegister()方法中,通过group().register(channel)调用MultithreadEventLoopGroup的register()方法。

        2、在MultithreadEventLoopGroup的register()方法中,调用next方法获取一个可用的SingleThreadEventLoop,然后调用它的register方法。

        3、在SingleThreadEventLoop的register方法中,调用channel.unsafe().register(this,promise)方法获取Channel的unsafe()底层操作对象,然后调用Unsafe的register方法。

        4、在AbstractUnsafe的register方法中,调用register0方法注册到Channel对象。

        5、在AbstractUnsafe的register0方法中,调用AbstractNioChannel的doRegister方法。

        6、AbstractNioChannel的doRegiter方法通过javaChannel.register(eventLoop().selector,0,this)将Channel对应的Java NIO的SocketChannel注册到一个eventLoop的Selector中,并且将当前Channel作为Attachment与SocketChannel关联。

        总的来说,Channel的注册过程所做的工作就是将Channel与对应的EventLoop进行关联。因此,在Netty中,每个Channel都会关联一个特定的EventLoop,并且这个Channel中的所有I/O操作都是在这个EventLoop中执行的;当关联好Channel和EventLoop后,会继续调用底层的Java NIO的SocketChannel对象的register方法,将底层Java NIO的SocketChannel注册到指定的Selector中。通过这两步,就完成了Netty对Channel的注册过程。

     


原文地址:https://blog.csdn.net/geminigoth/article/details/136604283

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!