自学内容网 自学内容网

Spingboot整合Netty,简单示例

Netty介绍在文章末尾  Netty介绍

项目背景

传统socket通信,有需要自身管理整个状态,业务繁杂等问题。

pom.xml

<dependency>
   <groupId>io.netty</groupId>
   <artifactId>netty-all</artifactId>
   <version>4.1.117.Final</version>
</dependency>

代码

封装Netty的服务类

package com.example.server;

import com.example.exception.ServiceException;
import com.example.handler.NettyServerInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyServer {
// 用于处理连接请求的线程组
private EventLoopGroup bossGroup;
// 用于处理I/O操作的线程组
private EventLoopGroup workerGroup;
// 服务器
private ChannelFuture channelFuture;

/**
 * 启动 Netty 服务端
 *
 * @param port 监听的端口
 */
public void start(int port) {
bossGroup = new NioEventLoopGroup(); // 初始化bossGroup
workerGroup = new NioEventLoopGroup(); // 初始化workerGroup

try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 使用NIO传输
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 添加编解码器和处理器
ch.pipeline().addLast(new NettyServerInitializer());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // 设置TCP参数
.childOption(ChannelOption.SO_KEEPALIVE, true);

// 绑定端口并启动服务器
channelFuture = bootstrap.bind(port).sync();
log.info("Netty 服务端启动成功,监听端口: {}", port);
} catch (InterruptedException e) {
log.error("Netty 服务端启动失败,端口: {}, 异常: {}", port, e.getMessage(), e);
throw new ServiceException("Netty 服务端启动异常");
} catch (Exception e) {
log.error("绑定端口时发生异常: {}", e.getMessage(), e);
throw new ServiceException("端口绑定异常");
}
}

/**
 * 停止 Netty 服务端
 */
public void stop() {
if (channelFuture != null) {
channelFuture.channel().close();
}
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
System.out.println("Netty 服务端已关闭");
}


}

封装Netty服务的管理类

用来创建和销毁多个Netty

package com.example.server;

import com.example.exception.ServiceException;
import org.springframework.stereotype.Service;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Service
public class NettyServerManager {
private final ConcurrentHashMap<Integer, NettyServer> nettyServers = new ConcurrentHashMap<>();

/**
 * 启动 Netty 服务端
 *
 * @param port 监听的端口
 */
public void startNettyServer(int port) {
if (nettyServers.containsKey(port)) {
throw new ServiceException("端口 " + port + " 已被占用");
}

NettyServer nettyServer = new NettyServer();
nettyServer.start(port);
nettyServers.put(port, nettyServer);
}

/**
 * 停止 Netty 服务端
 *
 * @param port 监听的端口
 */
public void stopNettyServer(int port) {
NettyServer nettyServer = nettyServers.get(port);
if (nettyServer != null) {
nettyServer.stop();
nettyServers.remove(port);
}
}

/**
 * 获取所有 Netty 服务端实例
 */
public Map<Integer, NettyServer> getNettyServers() {
return nettyServers;
}
}

NettyServer的初始化

package com.example.handler;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
// HTTP 编解码器:支持 HTTP 请求和响应
.addLast(new HttpServerCodec())
// 聚合 HTTP 消息,避免处理分块的 HTTP 请求
.addLast(new HttpObjectAggregator(65536))
// WebSocket 协议处理器,确保路径 `/websocket` 处理 WebSocket 握手和控制帧
.addLast(new WebSocketServerProtocolHandler("/websocket"))
// 字符串解码器,将字节流解析为字符串
.addLast(new StringDecoder(CharsetUtil.UTF_8))
// 字符串编码器,将字符串转换为字节流
.addLast(new StringEncoder(CharsetUtil.UTF_8))
// 自定义消息处理器,用于处理业务逻辑
.addLast(new WebSocketFrameHandler());
}
}

handler处理器类

用来处理具体的消息

package com.example.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class WebSocketFrameHandler extends ChannelInboundHandlerAdapter {
@Resource
private UserService userService;


@PostConstruct
public void init() {
webSocketFrameHandler = this;
}


/**
 * 当 Channel 变为活跃状态(连接建立)时调用
 */
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.warn("{} 已连接", ctx.channel().remoteAddress());
}

/**
 * 当 Channel 变为不活跃状态(连接关闭)时调用
 */
@Override
public void channelInactive(ChannelHandlerContext ctx) {
log.warn("{} 已断开", ctx.channel().remoteAddress());
}

/**
 * 当从 Channel 中读取到数据时调用
 */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

}

/**
 * 当一次数据读取完成时调用
 */
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// log.info("Channel 数据读取完成");
ctx.flush();
}

/**
 * 当处理过程中发生异常时调用
 */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("处理过程中发生异常,连接关闭: {}", cause.getMessage(), cause);
ctx.close();
}
}

消息传递实体

        这里分享一个我用来消息传递的实体,这个实体需要被序列化传递。因为Websocket传递都是文本的形式


Netty介绍

Netty 是一个异步事件驱动的网络应用框架,主要用于快速开发高性能、高可靠性的网络服务器和客户端。它简化了TCP、UDP等协议的编程,广泛应用于WebSocket等。

特性

异步非阻塞:基于事件驱动模型,支持高并发连接。

高性能:优化了网络通信,资源利用率高。

核心组件

  1. Channel:网络通信的管道,支持多种I/O操作。
  2. EventLoop:事件循环,处理I/O事件。
  3. ChannelHandler:处理I/O事件或拦截操作。
  4. ChannelPipeline:包含多个ChannelHandler,处理入站和出站事件。
  5. ByteBuf:高效的字节容器,优于JDK的ByteBuffer

原文地址:https://blog.csdn.net/qq_36920766/article/details/145292351

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!