Netty的粘包/拆包问题的解决之道
问题提出
在使用Netty进行网络通信时,由于网络传输的特性,数据往往无法保持完整的发送或接收。可以把netty的socket通道看成是一段水管,水管里的水是没有分界线的。TCP作为底层传输协议,是不了解上层业务数据的意义。这就会导致数据在传输过程中被拆分成多个小片段,或者多个数据包被合并到一个大的数据包中。为了解决这个问题,Netty提供了一些方法来处理拆包和粘包的问题。
1.消息定长,每个消息都是固定长度,不足字节部分用空格补全。
netty提供了提供了FixedLengthFrameEncoder和FixedLengthFrameDecoder来实现。说实在的,这种解决方式实在简单粗暴,根本不会有产品项目采纳。
2.在包尾插入分割符合,例如回车换行符
netty提供了LineBasedFrameDecoder+StringDecoder组合来支持换行符分割。
其工作原理是LineBasedFrameDecoder先遍历ByteBuf的可读字节,若包含换行符,则以其作为结束位置,将区间的字节作为一个整体字符数组,StringDecoder再将捕获的字符数组转成一个字符串。
若需要采用其他分割符,netty也提供了DelimiterBasedFrameDecode+StringDecoder组合来支持。
3.在消息头增加长度来表示消息的总大小
送端在数据包的前面添加一个消息长度字段,接收端根据消息长度进行解析。Netty提供了LengthFieldPrepender
和LengthFieldBasedFrameDecoder
来处理基于消息长度的粘包。
ChannelPipeline pipeline = ch.pipeline();
// 添加LengthFieldBasedFrameDecoder来解决粘包和拆包问题
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
// 添加LengthFieldPrepender来处理粘包和拆包问题
pipeline.addLast(new LengthFieldPrepender(4));
这个示例中的LengthFieldBasedFrameDecoder
使用了以下参数:
- frameMaxLengh:解码后的最大帧长度,超过这个长度的帧将被丢弃。
- lengthFieldOffset:长度字段的偏移量,指的是长度字段的起始位置相对于整个数据包的起始位置的偏移量。
- lengthFieldLength:长度字段的长度,指的是长度字段的字节数。
- lengthAdjustment:长度字段的调整值,指的是长度字段值需要增加或减少的量。
- initialBytesToStrip:从解码后的帧中去掉的字节数。
这几个参数比较复杂,源码给了非常详细的例子进行说明,有兴趣的可以通过源码进行阅读。
4,自定义消息头和消息体
第三种方法虽然使用简单,但无法满足一些需求。对于一个项目来说,可能消息的种类非常多,不能单纯使用消息长度来定义,需要额外一个消息id来表示。那既然项目已定义了自己的私有协议栈来解决粘包问题了,那就不会继续使用LengthFieldBasedFrameDecoder。例如jforgame就使用了完全私有的协议栈解析。
public class DefaultProtocolDecoder extends ByteToMessageDecoder {
private int maxProtocolBytes = 4096;
private Logger logger = LoggerFactory.getLogger(DefaultProtocolDecoder.class);
private MessageFactory messageFactory;
private MessageCodec messageCodec;
/**
* 消息元信息常量,为int类型的长度,表示消息的id
*/
private final int MESSAGE_META_SIZE = 4;
public DefaultProtocolDecoder(MessageFactory messageFactory, MessageCodec messageCodec) {
this.messageFactory = messageFactory;
this.messageCodec = messageCodec;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 4) {
return;
}
in.markReaderIndex();
// ----------------protocol pattern-------------------------
// packetLength | cmd | body
// int int byte[]
int length = in.readInt();
if (length > maxProtocolBytes) {
logger.error("单包长度[{}]过大,断开链接", length);
ctx.close();
return;
}
if (in.readableBytes() < length) {
in.resetReaderIndex();
return;
}
final int metaSize = MESSAGE_META_SIZE;
int cmd = in.readInt();
byte[] body = new byte[length - metaSize];
in.readBytes(body);
Class<?> msgClazz = messageFactory.getMessage(cmd);
Object msg = messageCodec.decode(msgClazz, body);
out.add(msg);
}
}
在本例中私有协议定义中定义了包头(包长度+消息id)和包体。如果buff小于包长度,则继续等待数据到达。通过消息id来定义一个实际的消息类型,再通过消息编解码来读取。可以继续优化的是,因为一般客户端的请求消息都比较小,发生粘包的可能性很大。在这里,我们还可以增加一个for循环来解析更多的请求包。
这里注意两个概念,私有协议指的是包头+包体,而消息编解码指的是包体反序列化成一个真正的业务对象。
原文地址:https://blog.csdn.net/littleschemer/article/details/136559506
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!