自学内容网 自学内容网

Netty笔记05-组件Handler & Pipeline


概述

ChannelHandler

ChannelHandler 是 Netty 中的一个接口,它定义了处理 I/O 事件的方法。ChannelHandler 可以处理各种类型的事件,包括连接事件、读写事件、异常事件等。

方法

入站(Inbound)方法:

  • channelRead(ChannelHandlerContext ctx, Object msg):当从通道接收到数据时调用。
  • channelReadComplete(ChannelHandlerContext ctx):当所有数据都已读取完毕时调用。
  • exceptionCaught(ChannelHandlerContext ctx, Throwable cause):当发生异常时调用。

出站(Outbound)方法:

  • write(ChannelHandlerContext ctx, Object msg, Promise promise):当有数据需要写出时调用。
  • flush(ChannelHandlerContext ctx):当需要刷新写出缓冲区时调用。

ChannelPipeline

ChannelPipeline 是 Netty 中的核心组件之一,它管理了一系列的 ChannelHandler。ChannelPipeline 可以理解为一个责任链模式的实现,它按照顺序处理事件,每个 ChannelHandler 负责处理特定类型的事件,并可以选择将事件传递给下一个 ChannelHandler 或者停止处理。

特点

  • 有序性:ChannelPipeline 中的 ChannelHandler 是按顺序排列的,可以插入、删除或替换。
  • 双向性:ChannelPipeline 支持双向处理,即可以处理入站(Inbound)事件,也可以处理出站(Outbound)事件。
  • 灵活性:可以动态地添加、删除或替换 ChannelHandler,而无需重新启动应用程序。

总结

ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline

  • 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
  • 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工

打个比喻,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品。

代码示例

服务器端

在服务器端中添加处理器 head -> h1 -> h2 -> h4 -> h3 -> h5 -> h6 -> tail

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.Charset;

@Slf4j
public class PipelineTest {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // 1. 通过 channel 拿到 pipeline
                        ChannelPipeline pipeline = ch.pipeline();
                        // 2. 添加处理器 head ->  h1 -> h2 ->  h4 -> h3 -> h5 -> h6 -> tail
                        //addLast()会加在tail之前,而不是最后。底层是双向链表
                        //入站处理器
                        pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("1");
                                //处理加工msg
                                ByteBuf buf = (ByteBuf) msg;
                                String name = buf.toString(Charset.defaultCharset());
                                //将处理好的字符串传递给h2
                                super.channelRead(ctx, name);
                            }
                        });
                        pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {
                                log.debug("2");
                                Student student = new Student(name.toString());
                                super.channelRead(ctx, student);
                                // 调用super.channelRead()将数据传递给下个 handler,如果不调用,调用链会断开。或者调用ctx.fireChannelRead(student);
                            }
                        });

                        //出战处理器。注意:ctx.writeAndFlush()和ch.writeAndFlush()都会走该出站处理器
                        pipeline.addLast("h2.5", new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("2.5");
                                super.write(ctx, msg, promise);
                            }
                        });

                        pipeline.addLast("h3", new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("3,h2传递的数据为{},class为{}",msg,msg.getClass());
                                //super.channelRead(ctx, msg);//channelRead()是将控制权交给pipeline中下一个入站处理器
                                //这里h3后面已经没有入站处理器了,所以可以不用调用channelRead()

                                //向channel中写入数据触发出站处理器(ch.writeAndFlush()会从尾节点向前找出站处理器)
                                ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));

                                //ctx.writeAndFlush()是从当前处理器向前寻找出站处理器
//                                ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));

                            }
                        });
                        //出站处理器 p2.25
                        //注意:出站处理器只有向channel中写入数据才会触发
                        pipeline.addLast("h4", new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("4");
                                super.write(ctx, msg, promise);
                            }
                        });
                        pipeline.addLast("h5", new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("5");
                                super.write(ctx, msg, promise);
                            }
                        });
                        pipeline.addLast("h6", new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("6");
                                super.write(ctx, msg, promise);
                            }
                        });
                    }
                })
                .bind(8080);
    }
    @Data
    @AllArgsConstructor
    static class Student {
        private String name;
    }
}

客户端

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

/**
 * @author qf
 * @since 2024/09/12 19:44
 */
public class Client {
    public static void main(String[] args) {
        new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect("127.0.0.1", 8080)
                .addListener((ChannelFutureListener) future -> {
                    future.channel().writeAndFlush("hello,world");
                });
    }
}

服务器端输出

18:45:18.287 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 1
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 2
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 3,h2传递的数据为PipelineTest.Student(name=hello,world),classclass com.qf.netty.Pipeline.PipelineTest$Student
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 6
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 5
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 4
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 2.5

以上代码可以看到,ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表。
在这里插入图片描述

  • 入站处理器中,super.channelRead(ctx, name)或使用ctx.fireChannelRead(msg) 是 调用下一个入站处理器

    • 如果注释掉 h1中 super.channelRead(ctx, name)代码,则仅会打印 1
    • 如果注释掉 h2中 super.channelRead(ctx, student)代码,则仅会打印 1 2
  • h3 处的 ch.writeAndFlush() 会 从尾部开始触发 后续出站处理器的执行

    • 如果注释掉 h3 处 ch.writeAndFlush() 代码,则仅会打印 1 2 3
  • 类似的,出站处理器中,super.write(ctx, msg, promise)或ctx.write(msg, promise) 的调用也会 触发上一个出站处理器

    • 如果注释掉 h6 处 super.write(ctx, msg, promise) 代码,则仅会打印 1 2 3 6
  • ctx.channel().write(msg) vs ctx.write(msg)

    • 都是触发出站处理器的执行
    • ctx.channel().write(msg) 从尾部开始查找出站处理器
    • ctx.write(msg) 是从当前节点找上一个出站处理器
    • 3 处的 ctx.channel().write(msg) 如果改为 ctx.write(msg) 仅会打印 1 2 3,因为节点3 之前没有其它出站处理器了
    • 6 处的 ctx.write(msg, promise) 如果改为 ctx.channel().write(msg) 会打印 1 2 3 6 6 6… 因为 ctx.channel().write() 是从尾部开始查找,结果又是节点6 自己(死循环)
      在这里插入图片描述
      服务端 pipeline 触发的原始流程,图中数字代表了处理步骤的先后次序

EmbeddedChannel

EmbeddedChannel 是 netty提供的专门用来测试的channel。
使用EmbeddedChannel进行测试可以不启动服务器和客户端了。

EmbeddedChannel 的方法

  • writeInbound(Object msg):将入站消息写到 EmbeddedChannel 中。
  • writeOutbound(Object msg):将出站消息写到 EmbeddedChannel 中。
  • readInbound():从 EmbeddedChannel 中读取入站消息。
  • readOutbound():从 EmbeddedChannel 中读取出站消息。
  • finish():完成所有未完成的写操作,并关闭 EmbeddedChannel

@Slf4j
public class Test06EmbeddedChannel {
    public static void main(String[] args) {
        ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.debug("1");
                super.channelRead(ctx, msg);
            }
        };
        ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.debug("2");
                super.channelRead(ctx, msg);
            }
        };
        ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                log.debug("3");
                super.write(ctx, msg, promise);
            }
        };
        ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                log.debug("4");
                super.write(ctx, msg, promise);
            }
        };
        EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
        // 模拟入站操作
//        channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
        /*
        [DEBUG] [main] c.i.n.c.Test06EmbeddedChannel - 1
        [DEBUG] [main] c.i.n.c.Test06EmbeddedChannel - 2
         */
        // 模拟出站操作
        channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("world".getBytes()));
        /*
        [DEBUG] [main] c.i.n.c.Test06EmbeddedChannel - 4
        [DEBUG] [main] c.i.n.c.Test06EmbeddedChannel - 3
         */
    }
}


全部文章:
Netty笔记01-Netty的基本概念与用法
Netty笔记02-组件EventLoop
Netty笔记03-组件Channel
Netty笔记04-组件Future & Promise
Netty笔记05-组件Handler & Pipeline


原文地址:https://blog.csdn.net/weixin_46425661/article/details/142177575

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!