Netty的高级用法(一)
前言
我们直到在网络通信中客户端和服务端之间除了要传输数据外,还会进行简单的心跳应答通信,使得客户端和服务端的连接处于一种活跃状态,那么客户端可以发送ONE_WAY和TWO_WAY两种方式的处理,而服务端在处理这两种类型的数据时会做出不同的应答,对于ONE_WAY形式的应答,有可能会交由异步线程池来执行,而对于TWO_WAY形式的消息,则是立刻做出回应,除了这些,还会牵扯到序列化和反序列化、数据加密验证的问题,因为网络通信中数据是二进制流的形式传输的,这其中会牵扯到粘包/半包的问题,以及序列化和反序列性能问题。在解决这些基本组件之后,服务端还可以对于客户端进行认证,不在白名单的客户端不接受连接。再者就是针对客户端的输入,服务端做出不同的应答。接着给大家展示一个实例,希望可以帮到大家.
先导入以下maven依赖
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>com.itextpdf</groupId>
<artifactId>itextpdf</artifactId>
<version>5.5.8</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<version>1.49</version>
<type>jar</type>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<version>1.49</version>
<type>jar</type>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<version>0.42</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
</dependencies>
1.Server
package adv;
import adv.server.ServerInit;
import constant.Constant;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.NettyRuntime;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NettyServer {
private static final Logger LOG = LoggerFactory.getLogger(NettyServer.class);
public void bind() throws InterruptedException {
// 配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1,
new DefaultThreadFactory("boss"));
EventLoopGroup workerGroup = new NioEventLoopGroup(NettyRuntime.availableProcessors(),
new DefaultThreadFactory("nt_worker"));
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ServerInit());
// 绑定端口,同步等待成功
ChannelFuture channelFuture = b.bind(Constant.DEFAULT_PORT).sync();
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("绑定成功。。。。" + future.toString());
}
});
LOG.info("Netty server start :" + Constant.DEFAULT_SERVER_IP + " :" + Constant.DEFAULT_PORT);
}
public static void main(String[] args) throws InterruptedException {
new NettyServer().bind();
}
}
2.ServerPipeline
package adv.server;
import adv.kryocodec.KryoDecoder;
import adv.kryocodec.KryoEncoder;
import adv.server.asyncpro.DefaultTaskProcessor;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.timeout.ReadTimeoutHandler;
public class ServerInit extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 粘包半包问题
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,
0, 2,0,2));
ch.pipeline().addLast(new LengthFieldPrepender(2));
// 序列化相关
ch.pipeline().addLast(new KryoDecoder());
ch.pipeline().addLast(new KryoEncoder());
// 处理心跳超时
ch.pipeline().addLast(new ReadTimeoutHandler(15));
ch.pipeline().addLast(new LoginAuthRespHandler());
ch.pipeline().addLast(new HeartBeatRespHandler());
ch.pipeline().addLast(new ServerBusinessHandler(new DefaultTaskProcessor()));
}
}
3.服务端业务处理
package adv.server;
import adv.server.asyncpro.AsyncBusinessProcess;
import adv.server.asyncpro.ITaskProcessor;
import adv.vo.EncryptUtils;
import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ServerBusinessHandler extends SimpleChannelInboundHandler<MyMessage> {
private static final Logger LOG = LoggerFactory.getLogger(ServerBusinessHandler.class);
private ITaskProcessor taskProcessor;
public ServerBusinessHandler(ITaskProcessor taskProcessor) {
super();
this.taskProcessor = taskProcessor;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyMessage msg) throws Exception {
// 检查MD5
String headMd5 = msg.getMsgHeader().getMd5();
String calcMd5 = EncryptUtils.encryptObj(msg.getBody());
if (!headMd5.equals(calcMd5)) {
LOG.error("报文MD5检查不通过:" + headMd5 + "vs" + calcMd5 + ", 关闭连接");
ctx.writeAndFlush(buildBusinessResp("报文MD5检查不通过,关闭连接"));
ctx.close();
}
LOG.info(msg.toString());
if (msg.getMsgHeader().getType() == MessageType.ONE_WAY.value()) {
LOG.debug("ONE_WAY类型消息,异步处理");
AsyncBusinessProcess.submitTask(taskProcessor.execAsyncTask(msg));
} else {
LOG.debug("TWO_WAY类型消息,应答");
ctx.writeAndFlush(buildBusinessResp("OK"));
}
}
private MyMessage buildBusinessResp(String result) {
MyMessage myMessage = new MyMessage();
MsgHeader msgHeader = new MsgHeader();
msgHeader.setType(MessageType.SERVICE_RESP.value());
myMessage.setMsgHeader(msgHeader);
myMessage.setBody(result);
return myMessage;
}
}
4.安全中心
package adv.server;
import constant.Constant;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
public class SecurityCenter {
// 用以检查用户是否重复登录的缓存
private static Map<String, Boolean> nodeCheck = new ConcurrentHashMap<>();
// 用户登录的白名单
private static Set<String> whiteList = new CopyOnWriteArraySet<>();
static {
whiteList.add(Constant.DEFAULT_SERVER_IP);
}
public static boolean isWhiteIP(String ip) {
return whiteList.contains(ip);
}
public static boolean isDupLog(String usrInfo) {
return nodeCheck.containsKey(usrInfo);
}
public static void addLoginUser(String usrInfo) {
nodeCheck.put(usrInfo, true);
}
public static void removeLoginUser(String usrInfo) {
nodeCheck.remove(usrInfo, true);
}
}
5.登录认证
package adv.server;
import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
public class LoginAuthRespHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(LoginAuthRespHandler.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg);
MyMessage message = (MyMessage) msg;
// 是不是握手认证请求
if (message.getMsgHeader() != null
&& message.getMsgHeader().getType() == MessageType.LOGIN_REQ.value()) {
LOG.info("收到客户端认证请求 :" + message);
String nodeIndex = ctx.channel().remoteAddress().toString();
MyMessage loginResp = null;
boolean checkAutuPass = false;
// 重复登录,拒绝,这里用客户端的地址代替了实际的用户信息
if (SecurityCenter.isDupLog(nodeIndex)) {
loginResp = buildResponse((byte) -1);
LOG.warn("拒绝重复登录, 应答消息:" + loginResp);
ctx.writeAndFlush(loginResp);
ctx.close();
} else {
// 检查用户是否在白名单中,在则允许登录,并写入缓存
InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
String ip = address.getAddress().getHostAddress();
if (SecurityCenter.isWhiteIP(ip)) {
SecurityCenter.addLoginUser(nodeIndex);
loginResp = buildResponse((byte) 0);
LOG.info("认证通过,应答消息:" + loginResp);
ctx.writeAndFlush(loginResp);
} else {
loginResp = buildResponse((byte) -1);
LOG.warn("认证失败, 应答信息 :" + loginResp);
ctx.writeAndFlush(loginResp);
ctx.close();
}
}
ReferenceCountUtil.release(msg);
} else {
ctx.fireChannelRead(msg);
}
}
private MyMessage buildResponse(byte result) {
MyMessage myMessage = new MyMessage();
MsgHeader msgHeader = new MsgHeader();
msgHeader.setType(MessageType.LOGIN_RESP.value());
myMessage.setMsgHeader(msgHeader);
myMessage.setBody(result);
return myMessage;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// super.exceptionCaught(ctx, cause);
SecurityCenter.removeLoginUser(ctx.channel().remoteAddress().toString());
ctx.close();
}
}
6.心跳处理
package adv.server;
import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HeartBeatRespHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(HeartBeatRespHandler.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg);
MyMessage message = (MyMessage)msg;
// 是不是心跳请求
if (message.getMsgHeader() != null
&& message.getMsgHeader().getType() == MessageType.HEARTBEAT_REQ.value()) {
// 心跳应答报文
MyMessage heartBeatResp = buildHeartBeat();
LOG.debug("心跳应答: " + heartBeatResp);
ctx.writeAndFlush(heartBeatResp);
ReferenceCountUtil.release(msg);
} else {
ctx.fireChannelRead(msg);
}
}
private MyMessage buildHeartBeat() {
MyMessage myMessage = new MyMessage();
MsgHeader msgHeader = new MsgHeader();
msgHeader.setType(MessageType.HEARTBEAT_RESP.value());
myMessage.setMsgHeader(msgHeader);
return myMessage;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof ReadTimeoutException) {
LOG.warn("客户端长时间未通信,可能已经宕机,关闭链路");
SecurityCenter.removeLoginUser(ctx.channel().remoteAddress().toString());
ctx.close();
}
super.exceptionCaught(ctx, cause);
}
}
7.ONE_WAY/TWO_WAY处理
package adv.server.asyncpro;
import io.netty.util.NettyRuntime;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class AsyncBusinessProcess {
private static ExecutorService executorService =
new ThreadPoolExecutor(1,
NettyRuntime.availableProcessors(),
60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3000));
public static void submitTask(Runnable task) {
executorService.execute(task);
}
}
package adv.server.asyncpro;
import adv.vo.MyMessage;
// 消息转任务处理器
public interface ITaskProcessor {
Runnable execAsyncTask(MyMessage msg);
}
package adv.server.asyncpro;
import adv.vo.MyMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DefaultTaskProcessor implements ITaskProcessor{
private static final Logger LOG = LoggerFactory.getLogger(DefaultTaskProcessor.class);
@Override
public Runnable execAsyncTask(MyMessage msg) {
Runnable task = new Runnable() {
@Override
public void run() {
LOG.info("DefaultTaskProcessor模拟任务处理:" + msg.getBody());
}
};
return task;
}
}
8.实体类以及加密
package adv.vo;
public class MyMessage {
private MsgHeader msgHeader;
private Object body;
public MsgHeader getMsgHeader() {
return msgHeader;
}
public void setMsgHeader(MsgHeader msgHeader) {
this.msgHeader = msgHeader;
}
public Object getBody() {
return body;
}
public void setBody(Object body) {
this.body = body;
}
@Override
public String toString() {
return "MyMessage{" +
"msgHeader=" + msgHeader +
", body=" + body +
'}';
}
}
package adv.vo;
import java.util.HashMap;
import java.util.Map;
// 消息头
public class MsgHeader {
// 消息体的MD5摘要
private String md5;
// 消息ID,因为是同步处理模式,不考虑应答消息需要填入请求消息ID
private long msgId;
// 消息类型
private byte type;
// 消息优先级
private byte priority;
private Map<String, Object> attachment = new HashMap<>();
public String getMd5() {
return md5;
}
public void setMd5(String md5) {
this.md5 = md5;
}
public long getMsgId() {
return msgId;
}
public void setMsgId(long msgId) {
this.msgId = msgId;
}
public byte getType() {
return type;
}
public void setType(byte type) {
this.type = type;
}
public byte getPriority() {
return priority;
}
public void setPriority(byte priority) {
this.priority = priority;
}
public Map<String, Object> getAttachment() {
return attachment;
}
public void setAttachment(Map<String, Object> attachment) {
this.attachment = attachment;
}
@Override
public String toString() {
return "MsgHeader{" +
"md5='" + md5 + '\'' +
", msgId=" + msgId +
", type=" + type +
", priority=" + priority +
", attachment=" + attachment +
'}';
}
}
package adv.vo;
public enum MessageType {
SERVICE_REQ((byte) 0), // 业务请求消息
SERVICE_RESP((byte) 1), // TWO_WAY消息,需要业务应答
ONE_WAY((byte) 2), // 无需应答的业务请求消息
LOGIN_REQ((byte) 3), // 登录请求消息
LOGIN_RESP((byte) 4), // 登录响应消息
HEARTBEAT_REQ((byte) 5), // 心跳请求消息
HEARTBEAT_RESP((byte) 6), // 心跳应答消息
;
private byte value;
MessageType (byte value) {
this.value = value;
}
public byte value() {
return this.value;
}
}
package adv.vo;
import java.util.concurrent.atomic.AtomicLong;
public class MakeMsgId {
private static AtomicLong msgId = new AtomicLong(1);
public static long getID() {
return msgId.getAndIncrement();
}
}
package adv.vo;
import adv.kryocodec.KryoSerializer;
import java.security.MessageDigest;
public class EncryptUtils {
private static String EncryptStr(String strSrc, String encName) {
MessageDigest md = null;
String strDes = null;
byte[] bt = strSrc.getBytes();
try {
if (encName == null || encName.equals("")) {
encName = "MD5";
}
md = MessageDigest.getInstance(encName);
md.update(bt);
strDes = bytes2Hex(md.digest());
} catch (Exception e) {
System.out.println("Invalid algorithm.");
return null;
}
return strDes;
}
/**
* MD5 摘要
* @param str 需要被摘要的字符串
* @return 对字符串str进行MD5摘要后,将摘要字符串返回
*/
public static String EncryptByMD5(String str) {
return EncryptStr(str, "MD5");
}
/**
* SHA1摘要
* @param str 需要被摘要的字符串
* @return 对字符串str进行SHA-1摘要后,将摘要字符串返回
*/
public static String EncryptBySHA1(String str) {
return EncryptStr(str, "SHA-1");
}
/**
* SHA256摘要
* @param str 需要被摘要的字符串
* @return 对字符串str进行SHA-256摘要后,将摘要字符串返回
*/
public static String EncryptBySHA256(String str) {
return EncryptStr(str, "SHA-256");
}
/**
* 字节转十六进制,结果以字符串形式呈现
*/
private static String bytes2Hex(byte[] bts) {
String des = "";
String tmp = null;
for (int i = 0; i < bts.length; i++) {
tmp = (Integer.toHexString(bts[i] & 0xFF));
if (tmp.length() == 1) {
des += "0";
}
des += tmp;
}
return des;
}
/**
* 对字符串进行MD5加盐摘要 先将str进行一次MD5摘要,摘要后再取摘要后的字符串的
* 1,3,5个字符追加到摘要串,再拿这个摘要串再次进行摘要
*/
private static String encrypt(String str) {
String encryptStr = EncryptByMD5(str);
if (encryptStr != null) {
encryptStr = encryptStr + encryptStr.charAt(0) + encryptStr.charAt(2) + encryptStr.charAt(4);
encryptStr = EncryptByMD5(encryptStr);
}
return encryptStr;
}
public static String encryptObj(Object o) {
return encrypt(bytes2Hex(KryoSerializer.obj2Bytes(o)));
}
}
9.Kryo序列化/反序列化
package adv.kryocodec;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
public class KryoSerializer {
private static Kryo kryo = KryoFactory.createKryo();
// 序列化
public static void serialize(Object object, ByteBuf out) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Output output = new Output(baos);
kryo.writeClassAndObject(output, object);
output.flush();
output.close();
byte[] b = baos.toByteArray();
try {
baos.flush();
baos.close();
} catch (IOException e) {
e.printStackTrace();
}
out.writeBytes(b);
}
// 序列化为一个字节数组,主要用在消息摘要上
public static byte[] obj2Bytes(Object object) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Output output = new Output(baos);
kryo.writeClassAndObject(output, object);
output.flush();
output.close();
byte[] b = baos.toByteArray();
try {
baos.flush();
baos.close();
} catch (IOException e) {
e.printStackTrace();
}
return b;
}
public static Object deserialize(ByteBuf out) {
if (out == null) {
return null;
}
Input input = new Input(new ByteBufInputStream(out));
return kryo.readClassAndObject(input);
}
}
package adv.kryocodec;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.serializers.DefaultSerializers;
import de.javakaffee.kryoserializers.*;
import java.lang.reflect.InvocationHandler;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
public class KryoFactory {
public static Kryo createKryo() {
Kryo kryo = new Kryo();
kryo.setRegistrationRequired(false);
kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
kryo.register(GregorianCalendar.class, new GregorianCalendarSerializer());
kryo.register(InvocationHandler.class, new JdkProxySerializer());
kryo.register(BigDecimal.class, new DefaultSerializers.BigDecimalSerializer());
kryo.register(BigInteger.class, new DefaultSerializers.BigIntegerSerializer());
kryo.register(Pattern.class, new RegexSerializer());
kryo.register(BitSet.class, new BitSetSerializer());
kryo.register(URI.class, new URISerializer());
kryo.register(UUID.class, new UUIDSerializer());
UnmodifiableCollectionsSerializer.registerSerializers(kryo);
SynchronizedCollectionsSerializer.registerSerializers(kryo);
kryo.register(HashMap.class);
kryo.register(ArrayList.class);
kryo.register(LinkedList.class);
kryo.register(HashSet.class);
kryo.register(TreeSet.class);
kryo.register(Hashtable.class);
kryo.register(Date.class);
kryo.register(Calendar.class);
kryo.register(ConcurrentHashMap.class);
kryo.register(SimpleDateFormat.class);
kryo.register(GregorianCalendar.class);
kryo.register(Vector.class);
kryo.register(BitSet.class);
kryo.register(StringBuffer.class);
kryo.register(StringBuilder.class);
kryo.register(Object.class);
kryo.register(Object[].class);
kryo.register(String[].class);
kryo.register(byte[].class);
kryo.register(char[].class);
kryo.register(int[].class);
kryo.register(float[].class);
kryo.register(double[].class);
return kryo;
}
}
package adv.kryocodec;
import adv.vo.MyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class KryoEncoder extends MessageToByteEncoder<MyMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, MyMessage msg, ByteBuf out) throws Exception {
KryoSerializer.serialize(msg, out);
ctx.flush();
}
}
package adv.kryocodec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
// 反序列化的Handler
public class KryoDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) throws Exception {
Object obj = KryoSerializer.deserialize(in);
out.add(obj);
}
}
package adv.kryocodec;
import adv.vo.EncryptUtils;
import adv.vo.MakeMsgId;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.HashMap;
import java.util.Map;
public class TestKryoCodeC {
public MyMessage getMessage(int j) {
// String content = "abcdefg-----------AAAAAAAAAA" + j;
String content = "abcdefg-----------AAAAAA" + j;
MyMessage myMessage = new MyMessage();
MsgHeader msgHeader = new MsgHeader();
msgHeader.setMsgId(MakeMsgId.getID());
msgHeader.setType((byte) 1);
msgHeader.setPriority((byte) 7);
msgHeader.setMd5(EncryptUtils.encryptObj(content));
Map<String, Object> attachment = new HashMap<>();
for (int i = 0; i < 10; i++) {
attachment.put("city --> " + i, "cover " + i);
}
msgHeader.setAttachment(attachment);
myMessage.setMsgHeader(msgHeader);
myMessage.setBody(content);
return myMessage;
}
public static void main(String[] args) {
TestKryoCodeC testC = new TestKryoCodeC();
for (int i = 0; i < 5; i++) {
ByteBuf sendBuf = Unpooled.buffer();
MyMessage message = testC.getMessage(i);
System.out.println("Encode:" + message);
KryoSerializer.serialize(message, sendBuf);
MyMessage decodeMsg = (MyMessage)KryoSerializer.deserialize(sendBuf);
System.out.println("Decode:" + decodeMsg);
System.out.println("-----------------------------------------------");
}
}
}
10.业务实体类
package adv.business;
import serializable.msgpack.UserContact;
public class User {
private String id;
private String userName;
private int age;
private UserContact userContact;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public UserContact getUserContact() {
return userContact;
}
public void setUserContact(UserContact userContact) {
this.userContact = userContact;
}
public User() {
}
public User(String id, String userName, int age) {
this.id = id;
this.userName = userName;
this.age = age;
}
@Override
public String toString() {
return "User{" +
"id='" + id + '\'' +
", userName='" + userName + '\'' +
", age=" + age +
", userContact=" + userContact +
'}';
}
}
package adv.business;
public class UserContact {
private String mail;
private String phone;
public UserContact() {
}
public UserContact(String mail, String phone) {
this.mail = mail;
this.phone = phone;
}
public String getMail() {
return mail;
}
public void setMail(String mail) {
this.mail = mail;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
@Override
public String toString() {
return "UserContact{" +
"mail='" + mail + '\'' +
", phone='" + phone + '\'' +
'}';
}
}
11.ClientPipeline()
package adv.client;
import adv.kryocodec.KryoDecoder;
import adv.kryocodec.KryoEncoder;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
public class ClientInit extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
// 连接写空闲检测
ch.pipeline().addLast(new CheckWriteIdleHandler());
// 粘包半包
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,
0,2,0,
2));
ch.pipeline().addLast(new LengthFieldPrepender(2));
// 序列化相关
ch.pipeline().addLast(new KryoDecoder());
ch.pipeline().addLast(new KryoEncoder());
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new LoginAuthReqHandler());
// 连续读空闲检测
ch.pipeline().addLast(new ReadTimeoutHandler(15));
// 向服务器发出心跳请求
ch.pipeline().addLast(new HeartBeatReqHandler());
ch.pipeline().addLast(new ClientBusiHandler());
}
}
12.客户端认证
package adv.client;
import adv.NettyServer;
import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LoginAuthReqHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(LoginAuthReqHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//发出认证请求
MyMessage loginMsg = buildLoginReq();
LOG.info("请求服务器认证:" + loginMsg);
ctx.writeAndFlush(loginMsg);
// super.channelActive(ctx);
}
private MyMessage buildLoginReq() {
MyMessage myMessage = new MyMessage();
MsgHeader msgHeader = new MsgHeader();
msgHeader.setType(MessageType.LOGIN_REQ.value());
myMessage.setMsgHeader(msgHeader);
return myMessage;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg);
MyMessage message = (MyMessage) msg;
if (message.getMsgHeader() != null
&& message.getMsgHeader().getType() == MessageType.LOGIN_RESP.value()) {
LOG.info("收到认证应答报文,服务器是否验证通过?....");
byte loginResult = (byte) message.getBody();
if (loginResult != (byte)0) {
// 握手成功,关闭连接
LOG.warn("未通过认证,关闭连接: " + message);
ctx.close();
} else {
LOG.info("通过认证, 移除本处理器, 进入业务通信 :" + message);
ctx.pipeline().remove(this);
ReferenceCountUtil.release(msg);
}
} else {
ctx.fireChannelRead(msg);
}
}
}
13.客户端心跳
package adv.client;
import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// 客户端在长久未向服务器业务请求时,发出心跳请求报文
public class HeartBeatReqHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(LoginAuthReqHandler.class);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) {
MyMessage heartBeat = buildHeartBeat();
LOG.debug("写空闲,发出心跳报文维持连接: " + heartBeat);
ctx.writeAndFlush(heartBeat);
}
super.userEventTriggered(ctx, evt);
}
private MyMessage buildHeartBeat() {
MyMessage myMessage = new MyMessage();
MsgHeader msgHeader = new MsgHeader();
msgHeader.setType(MessageType.HEARTBEAT_REQ.value());
myMessage.setMsgHeader(msgHeader);
return myMessage;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg);
MyMessage message = (MyMessage)msg;
if (message.getMsgHeader() != null
&& message.getMsgHeader().getType() == MessageType.HEARTBEAT_RESP.value()) {
LOG.debug("收到服务器心跳应答, 服务器正常");
ReferenceCountUtil.release(msg);
} else {
ctx.fireChannelRead(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof ReadTimeoutException) {
LOG.warn("服务器长时间未应答, 关闭链路");
}
super.exceptionCaught(ctx, cause);
}
}
14.客户端检查写空闲
package adv.client;
import io.netty.handler.timeout.IdleStateHandler;
public class CheckWriteIdleHandler extends IdleStateHandler {
public CheckWriteIdleHandler () {
super(0,8,0);
}
}
15.客户端业务处理
package adv.client;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClientBusiHandler extends SimpleChannelInboundHandler<MyMessage> {
private static final Logger LOG = LoggerFactory.getLogger(ClientBusiHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyMessage msg) throws Exception {
LOG.info("业务应答消息: " + msg.toString());
}
}
16.客户端启动
package adv;
import adv.client.ClientInit;
import adv.vo.*;
import constant.Constant;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class NettyClient implements Runnable{
private static final Logger LOG = LoggerFactory.getLogger(NettyClient.class);
// 负责重连的线程池
private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
private Channel channel;
private EventLoopGroup group = new NioEventLoopGroup();
// 是否用户主动关闭连接的标志
private volatile boolean userClose = false;
// 连接是否成功关闭的标志
private volatile boolean connected = false;
public boolean isConnected() {
return connected;
}
public void connect(int port, String host) throws InterruptedException {
try {
// 客户端启动必备
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class) // 指定使用NIO的通信模式
.handler(new ClientInit());
ChannelFuture future = b.connect(new InetSocketAddress(host, port)).sync();
LOG.info("已连接服务器");
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
LOG.info("连接事件产生回调....operationCompletable");
}
});
channel = future.channel();
synchronized (this) {
this.connected = true;
this.notifyAll();
}
channel.closeFuture().sync();
} finally {
if (!userClose) {
// 非正常关闭,有可能发生了网络问题
LOG.warn("需要进行重连");
executorService.execute(() -> {
try {
// 给操作系统足够的时间取释放相关的资源
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
// throw new RuntimeException(e);
}
});
} else {
// 正常关闭
channel = null;
group.shutdownGracefully().sync();
synchronized (this) {
this.connected = false;
this.notifyAll();
}
}
}
}
@Override
public void run() {
try {
connect(Constant.DEFAULT_PORT, Constant.DEFAULT_SERVER_IP);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void sendOneWay(Object message) throws IllegalAccessException {
if (channel == null || !channel.isActive()) {
throw new IllegalAccessException("和服务器还没建立起有效连接,请稍后再试");
}
MyMessage myMessage = new MyMessage();
MsgHeader msgHeader = new MsgHeader();
msgHeader.setMsgId(MakeMsgId.getID());
msgHeader.setType(MessageType.ONE_WAY.value());
msgHeader.setMd5(EncryptUtils.encryptObj(message));
myMessage.setMsgHeader(msgHeader);
myMessage.setBody(message);
channel.writeAndFlush(myMessage);
}
public void send(Object message) throws IllegalAccessException {
if (channel == null || !channel.isActive()) {
throw new IllegalAccessException("和服务器还没建立起有效连接,请稍后再试");
}
MyMessage myMessage = new MyMessage();
MsgHeader msgHeader = new MsgHeader();
msgHeader.setMsgId(MakeMsgId.getID());
msgHeader.setType(MessageType.SERVICE_REQ.value());
msgHeader.setMd5(EncryptUtils.encryptObj(message));
myMessage.setMsgHeader(msgHeader);
myMessage.setBody(message);
channel.writeAndFlush(myMessage);
}
public void close() {
userClose = true;
channel.close();
}
}
package adv;
import adv.business.User;
import serializable.msgpack.UserContact;
import java.util.Scanner;
public class BusiClient {
public static void main(String[] args) throws InterruptedException, IllegalAccessException {
NettyClient nettyClient = new NettyClient();
new Thread(nettyClient).start();
while (!nettyClient.isConnected()) {
synchronized (nettyClient) {
nettyClient.wait();
}
}
System.out.println("网络通信已准备好,可以进行业务操作了。。。。。");
Scanner scanner = new Scanner(System.in);
while (true) {
String msg = scanner.next();
if (msg == null) {
break;
} else if ("q".equals(msg.toLowerCase())) {
nettyClient.close();
scanner.close();
while (nettyClient.isConnected()) {
synchronized (nettyClient) {
System.out.println("等待网络关闭完成....");
nettyClient.wait();
}
}
System.exit(1);
} else if ("v".equals(msg.toLowerCase())) {
User user = new User();
user.setAge(19);
String userName = "cover";
user.setUserName(userName);
user.setId("No:1");
user.setUserContact(
new UserContact(userName + "@gmail.com", "133"));
nettyClient.sendOneWay(user);
} else {
nettyClient.send(msg);
}
}
}
}
17.常量类
package constant;
import java.util.Date;
/**
* 常量
*/
public class Constant {
public static final Integer DEFAULT_PORT = 7777;
public static final String DEFAULT_SERVER_IP= "127.0.0.1";
// 根据输入信息拼接出一个应答信息
public static String response(String msg) {
return "Hello, " + msg + ", Now is" + new Date(System.currentTimeMillis()).toString();
}
}
18.结果展示
Server端启动包含认证
Client端启动
心跳应答
ONE_WAY
TWO_WAY
客户端退出请求
原文地址:https://blog.csdn.net/Cover_sky/article/details/135921583
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!