自学内容网 自学内容网

Netty的粘包/拆包问题的解决之道

问题提出

在使用Netty进行网络通信时,由于网络传输的特性,数据往往无法保持完整的发送或接收。可以把netty的socket通道看成是一段水管,水管里的水是没有分界线的。TCP作为底层传输协议,是不了解上层业务数据的意义。这就会导致数据在传输过程中被拆分成多个小片段,或者多个数据包被合并到一个大的数据包中。为了解决这个问题,Netty提供了一些方法来处理拆包和粘包的问题。

1.消息定长,每个消息都是固定长度,不足字节部分用空格补全。

netty提供了提供了FixedLengthFrameEncoder和FixedLengthFrameDecoder来实现。说实在的,这种解决方式实在简单粗暴,根本不会有产品项目采纳。

2.在包尾插入分割符合,例如回车换行符

netty提供了LineBasedFrameDecoder+StringDecoder组合来支持换行符分割。

其工作原理是LineBasedFrameDecoder先遍历ByteBuf的可读字节,若包含换行符,则以其作为结束位置,将区间的字节作为一个整体字符数组,StringDecoder再将捕获的字符数组转成一个字符串。

若需要采用其他分割符,netty也提供了DelimiterBasedFrameDecode+StringDecoder组合来支持。

3.在消息头增加长度来表示消息的总大小

送端在数据包的前面添加一个消息长度字段,接收端根据消息长度进行解析。Netty提供了LengthFieldPrependerLengthFieldBasedFrameDecoder来处理基于消息长度的粘包。

  ChannelPipeline pipeline = ch.pipeline();
  // 添加LengthFieldBasedFrameDecoder来解决粘包和拆包问题
  pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
  // 添加LengthFieldPrepender来处理粘包和拆包问题
  pipeline.addLast(new LengthFieldPrepender(4));

这个示例中的LengthFieldBasedFrameDecoder使用了以下参数:

  • frameMaxLengh:解码后的最大帧长度,超过这个长度的帧将被丢弃。
  • lengthFieldOffset:长度字段的偏移量,指的是长度字段的起始位置相对于整个数据包的起始位置的偏移量。
  • lengthFieldLength:长度字段的长度,指的是长度字段的字节数。
  • lengthAdjustment:长度字段的调整值,指的是长度字段值需要增加或减少的量。
  • initialBytesToStrip:从解码后的帧中去掉的字节数。

这几个参数比较复杂,源码给了非常详细的例子进行说明,有兴趣的可以通过源码进行阅读。

 4,自定义消息头和消息体

第三种方法虽然使用简单,但无法满足一些需求。对于一个项目来说,可能消息的种类非常多,不能单纯使用消息长度来定义,需要额外一个消息id来表示。那既然项目已定义了自己的私有协议栈来解决粘包问题了,那就不会继续使用LengthFieldBasedFrameDecoder。例如jforgame就使用了完全私有的协议栈解析。

public class DefaultProtocolDecoder extends ByteToMessageDecoder {

    private int maxProtocolBytes = 4096;

    private Logger logger = LoggerFactory.getLogger(DefaultProtocolDecoder.class);

    private MessageFactory messageFactory;

    private MessageCodec messageCodec;

    /**
     * 消息元信息常量,为int类型的长度,表示消息的id
     */
    private final int MESSAGE_META_SIZE = 4;

    public DefaultProtocolDecoder(MessageFactory messageFactory, MessageCodec messageCodec) {
        this.messageFactory = messageFactory;
        this.messageCodec = messageCodec;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() < 4) {
            return;
        }
        in.markReaderIndex();
        // ----------------protocol pattern-------------------------
        // packetLength | cmd | body
        // int int byte[]
        int length = in.readInt();
        if (length > maxProtocolBytes) {
            logger.error("单包长度[{}]过大,断开链接", length);
            ctx.close();
            return;
        }

        if (in.readableBytes() < length) {
            in.resetReaderIndex();
            return;
        }
        final int metaSize = MESSAGE_META_SIZE;
        int cmd = in.readInt();
        byte[] body = new byte[length - metaSize];
        in.readBytes(body);

        Class<?> msgClazz = messageFactory.getMessage(cmd);
        Object msg = messageCodec.decode(msgClazz, body);

        out.add(msg);
    }

}

在本例中私有协议定义中定义了包头(包长度+消息id)和包体。如果buff小于包长度,则继续等待数据到达。通过消息id来定义一个实际的消息类型,再通过消息编解码来读取。可以继续优化的是,因为一般客户端的请求消息都比较小,发生粘包的可能性很大。在这里,我们还可以增加一个for循环来解析更多的请求包。

这里注意两个概念,私有协议指的是包头+包体,而消息编解码指的是包体反序列化成一个真正的业务对象。


原文地址:https://blog.csdn.net/littleschemer/article/details/136559506

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!