自学内容网 自学内容网

Netty的高级用法(一)

前言

我们直到在网络通信中客户端和服务端之间除了要传输数据外,还会进行简单的心跳应答通信,使得客户端和服务端的连接处于一种活跃状态,那么客户端可以发送ONE_WAY和TWO_WAY两种方式的处理,而服务端在处理这两种类型的数据时会做出不同的应答,对于ONE_WAY形式的应答,有可能会交由异步线程池来执行,而对于TWO_WAY形式的消息,则是立刻做出回应,除了这些,还会牵扯到序列化和反序列化、数据加密验证的问题,因为网络通信中数据是二进制流的形式传输的,这其中会牵扯到粘包/半包的问题,以及序列化和反序列性能问题。在解决这些基本组件之后,服务端还可以对于客户端进行认证,不在白名单的客户端不接受连接。再者就是针对客户端的输入,服务端做出不同的应答。接着给大家展示一个实例,希望可以帮到大家.
先导入以下maven依赖

 <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.42.Final</version>
        </dependency>

        <dependency>
            <groupId>org.msgpack</groupId>
            <artifactId>msgpack</artifactId>
            <version>0.6.12</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.30</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.2.4</version>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.4</version>
        </dependency>

        <dependency>
            <groupId>com.itextpdf</groupId>
            <artifactId>itextpdf</artifactId>
            <version>5.5.8</version>
        </dependency>
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcprov-jdk15on</artifactId>
            <version>1.49</version>
            <type>jar</type>
            <scope>compile</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcpkix-jdk15on</artifactId>
            <version>1.49</version>
            <type>jar</type>
            <scope>compile</scope>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>de.javakaffee</groupId>
            <artifactId>kryo-serializers</artifactId>
            <version>0.42</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

1.Server

package adv;

import adv.server.ServerInit;
import constant.Constant;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.NettyRuntime;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyServer {
    
    private static final Logger LOG = LoggerFactory.getLogger(NettyServer.class);
    
    public void bind() throws InterruptedException {
        // 配置服务端的NIO线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1,
                new DefaultThreadFactory("boss"));
        EventLoopGroup workerGroup = new NioEventLoopGroup(NettyRuntime.availableProcessors(),
                new DefaultThreadFactory("nt_worker"));
        
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childHandler(new ServerInit());
        
        // 绑定端口,同步等待成功
        ChannelFuture channelFuture = b.bind(Constant.DEFAULT_PORT).sync();
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                System.out.println("绑定成功。。。。" + future.toString());
            }
        });
        
        LOG.info("Netty server start :" + Constant.DEFAULT_SERVER_IP + "  :" + Constant.DEFAULT_PORT);
        
    }

    public static void main(String[] args) throws InterruptedException {
        new NettyServer().bind();
    }
    
}
  

2.ServerPipeline

package adv.server;

import adv.kryocodec.KryoDecoder;
import adv.kryocodec.KryoEncoder;
import adv.server.asyncpro.DefaultTaskProcessor;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.timeout.ReadTimeoutHandler;

public class ServerInit extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        // 粘包半包问题
        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,
                0, 2,0,2));
        
        ch.pipeline().addLast(new LengthFieldPrepender(2));
        
        // 序列化相关
        ch.pipeline().addLast(new KryoDecoder());
        ch.pipeline().addLast(new KryoEncoder());
        // 处理心跳超时
        ch.pipeline().addLast(new ReadTimeoutHandler(15));

        ch.pipeline().addLast(new LoginAuthRespHandler());
        ch.pipeline().addLast(new HeartBeatRespHandler());

        ch.pipeline().addLast(new ServerBusinessHandler(new DefaultTaskProcessor()));
        
    }
}

3.服务端业务处理

package adv.server;

import adv.server.asyncpro.AsyncBusinessProcess;
import adv.server.asyncpro.ITaskProcessor;
import adv.vo.EncryptUtils;
import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServerBusinessHandler extends SimpleChannelInboundHandler<MyMessage> {

    private static final Logger LOG = LoggerFactory.getLogger(ServerBusinessHandler.class);

    private ITaskProcessor taskProcessor;

    public ServerBusinessHandler(ITaskProcessor taskProcessor) {
        super();
        this.taskProcessor = taskProcessor;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyMessage msg) throws Exception {
        // 检查MD5
        String headMd5 = msg.getMsgHeader().getMd5();
        String calcMd5 = EncryptUtils.encryptObj(msg.getBody());
        if (!headMd5.equals(calcMd5)) {
            LOG.error("报文MD5检查不通过:" + headMd5 + "vs" + calcMd5 + ", 关闭连接");
            ctx.writeAndFlush(buildBusinessResp("报文MD5检查不通过,关闭连接"));
            ctx.close();
        }
        LOG.info(msg.toString());

        if (msg.getMsgHeader().getType() == MessageType.ONE_WAY.value()) {
            LOG.debug("ONE_WAY类型消息,异步处理");
            AsyncBusinessProcess.submitTask(taskProcessor.execAsyncTask(msg));
        } else {
            LOG.debug("TWO_WAY类型消息,应答");
            ctx.writeAndFlush(buildBusinessResp("OK"));
        }

    }

    private MyMessage buildBusinessResp(String result) {
        MyMessage myMessage = new MyMessage();
        MsgHeader msgHeader = new MsgHeader();
        msgHeader.setType(MessageType.SERVICE_RESP.value());
        myMessage.setMsgHeader(msgHeader);
        myMessage.setBody(result);
        return myMessage;
    }
}

4.安全中心

package adv.server;

import constant.Constant;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;

public class SecurityCenter {
    
    // 用以检查用户是否重复登录的缓存
    private static Map<String, Boolean> nodeCheck = new ConcurrentHashMap<>();
    
    // 用户登录的白名单
    private static Set<String> whiteList = new CopyOnWriteArraySet<>();
    
    static {
        whiteList.add(Constant.DEFAULT_SERVER_IP);
    }
    
    public static boolean isWhiteIP(String ip) {
        return whiteList.contains(ip);
    }
    
    public static boolean isDupLog(String usrInfo) {
        return nodeCheck.containsKey(usrInfo);
    }
    
    public static void addLoginUser(String usrInfo) {
        nodeCheck.put(usrInfo, true);
    }
    
    public static void removeLoginUser(String usrInfo) {
        nodeCheck.remove(usrInfo, true);
        
    }
}

5.登录认证

package adv.server;

import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;

public class LoginAuthRespHandler  extends ChannelInboundHandlerAdapter {
    
    
    private static final Logger LOG = LoggerFactory.getLogger(LoginAuthRespHandler.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//        super.channelRead(ctx, msg);
        MyMessage message = (MyMessage) msg;
        // 是不是握手认证请求
        if (message.getMsgHeader() != null 
            && message.getMsgHeader().getType() == MessageType.LOGIN_REQ.value()) {
            LOG.info("收到客户端认证请求 :" + message);
            String nodeIndex = ctx.channel().remoteAddress().toString();
            MyMessage loginResp = null;
            boolean checkAutuPass = false;
            // 重复登录,拒绝,这里用客户端的地址代替了实际的用户信息
            if (SecurityCenter.isDupLog(nodeIndex)) {
                loginResp = buildResponse((byte) -1);
                LOG.warn("拒绝重复登录, 应答消息:" + loginResp);
                ctx.writeAndFlush(loginResp);
                ctx.close();
            } else {
                // 检查用户是否在白名单中,在则允许登录,并写入缓存
                InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
                String ip = address.getAddress().getHostAddress();
                if (SecurityCenter.isWhiteIP(ip)) {
                    SecurityCenter.addLoginUser(nodeIndex);
                    loginResp = buildResponse((byte) 0);
                    LOG.info("认证通过,应答消息:" + loginResp);
                    ctx.writeAndFlush(loginResp);
                } else {
                    loginResp = buildResponse((byte) -1);
                    LOG.warn("认证失败, 应答信息 :" + loginResp);
                    ctx.writeAndFlush(loginResp);
                    ctx.close();
                }
            }
            ReferenceCountUtil.release(msg);
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    private MyMessage buildResponse(byte result) {
        MyMessage myMessage = new MyMessage();
        MsgHeader msgHeader = new MsgHeader();
        msgHeader.setType(MessageType.LOGIN_RESP.value());
        myMessage.setMsgHeader(msgHeader);
        myMessage.setBody(result);
        return myMessage;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//        super.exceptionCaught(ctx, cause);
        SecurityCenter.removeLoginUser(ctx.channel().remoteAddress().toString());
        ctx.close();
    }
}

6.心跳处理

package adv.server;

import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HeartBeatRespHandler extends ChannelInboundHandlerAdapter {
    
    private static final Logger LOG = LoggerFactory.getLogger(HeartBeatRespHandler.class);


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//        super.channelRead(ctx, msg);
        MyMessage message = (MyMessage)msg;
        // 是不是心跳请求
        if (message.getMsgHeader() != null 
                && message.getMsgHeader().getType() == MessageType.HEARTBEAT_REQ.value()) {
            // 心跳应答报文
            MyMessage heartBeatResp = buildHeartBeat();
            LOG.debug("心跳应答: " + heartBeatResp);
            ctx.writeAndFlush(heartBeatResp);
            ReferenceCountUtil.release(msg);
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    private MyMessage buildHeartBeat() {
        MyMessage myMessage = new MyMessage();
        MsgHeader msgHeader = new MsgHeader();
        msgHeader.setType(MessageType.HEARTBEAT_RESP.value());
        myMessage.setMsgHeader(msgHeader);
        return myMessage;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof ReadTimeoutException) {
            LOG.warn("客户端长时间未通信,可能已经宕机,关闭链路");
            SecurityCenter.removeLoginUser(ctx.channel().remoteAddress().toString());
            ctx.close();
        }

        super.exceptionCaught(ctx, cause);
    }
}

7.ONE_WAY/TWO_WAY处理

package adv.server.asyncpro;

import io.netty.util.NettyRuntime;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class AsyncBusinessProcess {
    
    private static ExecutorService executorService = 
            new ThreadPoolExecutor(1,
                    NettyRuntime.availableProcessors(),
                    60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3000));
    
    
    public static void submitTask(Runnable task) {
        executorService.execute(task);
    }
}

package adv.server.asyncpro;

import adv.vo.MyMessage;

// 消息转任务处理器
public interface ITaskProcessor {
    
    Runnable execAsyncTask(MyMessage msg);
}

package adv.server.asyncpro;

import adv.vo.MyMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultTaskProcessor implements ITaskProcessor{
    
    private static final Logger LOG = LoggerFactory.getLogger(DefaultTaskProcessor.class);
    @Override
    public Runnable execAsyncTask(MyMessage msg) {
        Runnable task = new Runnable() {
            @Override
            public void run() {
                LOG.info("DefaultTaskProcessor模拟任务处理:" +  msg.getBody());
            }
        };
        return task;
    }
}

8.实体类以及加密

package adv.vo;

public class MyMessage {
    
    private MsgHeader msgHeader;
    
    private Object body;

    public MsgHeader getMsgHeader() {
        return msgHeader;
    }

    public void setMsgHeader(MsgHeader msgHeader) {
        this.msgHeader = msgHeader;
    }

    public Object getBody() {
        return body;
    }

    public void setBody(Object body) {
        this.body = body;
    }


    @Override
    public String toString() {
        return "MyMessage{" +
                "msgHeader=" + msgHeader +
                ", body=" + body +
                '}';
    }
}

package adv.vo;

import java.util.HashMap;
import java.util.Map;

// 消息头
public class MsgHeader {
    
    // 消息体的MD5摘要
    private String md5;
    
    // 消息ID,因为是同步处理模式,不考虑应答消息需要填入请求消息ID
    private long msgId;
    
    // 消息类型
    private byte type;
    
    // 消息优先级
    private byte priority;
    
    private Map<String, Object> attachment = new HashMap<>();


    public String getMd5() {
        return md5;
    }

    public void setMd5(String md5) {
        this.md5 = md5;
    }

    public long getMsgId() {
        return msgId;
    }

    public void setMsgId(long msgId) {
        this.msgId = msgId;
    }

    public byte getType() {
        return type;
    }

    public void setType(byte type) {
        this.type = type;
    }

    public byte getPriority() {
        return priority;
    }

    public void setPriority(byte priority) {
        this.priority = priority;
    }

    public Map<String, Object> getAttachment() {
        return attachment;
    }

    public void setAttachment(Map<String, Object> attachment) {
        this.attachment = attachment;
    }

    @Override
    public String toString() {
        return "MsgHeader{" +
                "md5='" + md5 + '\'' +
                ", msgId=" + msgId +
                ", type=" + type +
                ", priority=" + priority +
                ", attachment=" + attachment +
                '}';
    }
}

package adv.vo;

public enum MessageType {
    
    SERVICE_REQ((byte) 0), // 业务请求消息
    
    SERVICE_RESP((byte) 1), // TWO_WAY消息,需要业务应答
    
    ONE_WAY((byte) 2), // 无需应答的业务请求消息
    
    LOGIN_REQ((byte) 3), // 登录请求消息
    
    LOGIN_RESP((byte) 4), // 登录响应消息
    
    HEARTBEAT_REQ((byte) 5), // 心跳请求消息
    
    HEARTBEAT_RESP((byte) 6), // 心跳应答消息
    
    
    ;
    
    private byte value;
    
    MessageType (byte value) {
        this.value = value;
    }
    
    public byte value() {
        return this.value;
    }
}

package adv.vo;

import java.util.concurrent.atomic.AtomicLong;

public class MakeMsgId {
    
    private static AtomicLong msgId = new AtomicLong(1);
    
    public static long getID() {
        return msgId.getAndIncrement();
    }
}

package adv.vo;

import adv.kryocodec.KryoSerializer;

import java.security.MessageDigest;

public class EncryptUtils {
    
    private static String EncryptStr(String strSrc, String encName) {
        MessageDigest md  = null;
        String strDes = null;

        byte[] bt = strSrc.getBytes();
        try {
            if (encName == null || encName.equals("")) {
                encName = "MD5";
            }
            
            md = MessageDigest.getInstance(encName);
            md.update(bt);
            strDes = bytes2Hex(md.digest());
        } catch (Exception e) {
            System.out.println("Invalid algorithm.");
            return null;
        }
        
        return strDes;
    }

    /**
     * MD5 摘要
     * @param str 需要被摘要的字符串
     * @return 对字符串str进行MD5摘要后,将摘要字符串返回
     */
    public static String EncryptByMD5(String str) {
        return EncryptStr(str, "MD5");
    }

    /**
     * SHA1摘要
     * @param str 需要被摘要的字符串
     * @return 对字符串str进行SHA-1摘要后,将摘要字符串返回
     */
    public static String EncryptBySHA1(String str) {
        return EncryptStr(str, "SHA-1");
    }
    /**
     * SHA256摘要
     * @param str 需要被摘要的字符串
     * @return 对字符串str进行SHA-256摘要后,将摘要字符串返回
     */
    public static String EncryptBySHA256(String str) {
        return EncryptStr(str, "SHA-256");
    }

    /**
     * 字节转十六进制,结果以字符串形式呈现
     */

    private static String bytes2Hex(byte[] bts) {
        String des = "";
        String tmp = null;

        for (int i = 0; i < bts.length; i++) {
            tmp = (Integer.toHexString(bts[i] & 0xFF));
            
            if (tmp.length() == 1) {
                des += "0";
            }
            des += tmp;
            
        }
        
        return des;
    }
    
    /**
     * 对字符串进行MD5加盐摘要 先将str进行一次MD5摘要,摘要后再取摘要后的字符串的
     * 1,3,5个字符追加到摘要串,再拿这个摘要串再次进行摘要
     */
    private static String encrypt(String str) {
        String encryptStr = EncryptByMD5(str);
        if (encryptStr != null) {
            encryptStr = encryptStr + encryptStr.charAt(0) + encryptStr.charAt(2) + encryptStr.charAt(4);
            encryptStr = EncryptByMD5(encryptStr);
        }
        return encryptStr;
    }
    
    public static String encryptObj(Object o) {
        return encrypt(bytes2Hex(KryoSerializer.obj2Bytes(o)));
    }
    
}

9.Kryo序列化/反序列化

package adv.kryocodec;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;

import java.io.ByteArrayOutputStream;
import java.io.IOException;

public class KryoSerializer {
    
    private static Kryo kryo = KryoFactory.createKryo();
    
    // 序列化
    public static void serialize(Object object, ByteBuf out) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        Output output = new Output(baos);
        kryo.writeClassAndObject(output, object);
        
        output.flush();
        output.close();

        byte[] b = baos.toByteArray();
        try {
            baos.flush();
            baos.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        
        out.writeBytes(b);
    }
    
    // 序列化为一个字节数组,主要用在消息摘要上
    public static byte[] obj2Bytes(Object object) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        Output output = new Output(baos);
        kryo.writeClassAndObject(output, object);
        output.flush();
        output.close();

        byte[] b = baos.toByteArray();
        try {
            baos.flush();
            baos.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        
        return b;
    }
    
    
    public static Object deserialize(ByteBuf out) {
        if (out == null) {
            return null;
        }
        
        Input input = new Input(new ByteBufInputStream(out));
        return kryo.readClassAndObject(input);
    }
}

package adv.kryocodec;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.serializers.DefaultSerializers;
import de.javakaffee.kryoserializers.*;

import java.lang.reflect.InvocationHandler;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;

public class KryoFactory {
    
    
    public static Kryo createKryo() {
        
        Kryo kryo = new Kryo();
        kryo.setRegistrationRequired(false);
        kryo.register(Arrays.asList("").getClass(),  new ArraysAsListSerializer());
        kryo.register(GregorianCalendar.class, new GregorianCalendarSerializer());
        kryo.register(InvocationHandler.class, new JdkProxySerializer());
        kryo.register(BigDecimal.class, new DefaultSerializers.BigDecimalSerializer());
        kryo.register(BigInteger.class, new DefaultSerializers.BigIntegerSerializer());
        kryo.register(Pattern.class, new RegexSerializer());
        kryo.register(BitSet.class, new BitSetSerializer());
        kryo.register(URI.class, new URISerializer());
        kryo.register(UUID.class, new UUIDSerializer());
        
        UnmodifiableCollectionsSerializer.registerSerializers(kryo);
        SynchronizedCollectionsSerializer.registerSerializers(kryo);
        
        kryo.register(HashMap.class);
        kryo.register(ArrayList.class);
        kryo.register(LinkedList.class);
        kryo.register(HashSet.class);
        kryo.register(TreeSet.class);
        kryo.register(Hashtable.class);
        kryo.register(Date.class);
        kryo.register(Calendar.class);
        kryo.register(ConcurrentHashMap.class);
        kryo.register(SimpleDateFormat.class);
        kryo.register(GregorianCalendar.class);
        kryo.register(Vector.class);
        kryo.register(BitSet.class);
        kryo.register(StringBuffer.class);
        kryo.register(StringBuilder.class);
        
        kryo.register(Object.class);
        kryo.register(Object[].class);
        kryo.register(String[].class);
        kryo.register(byte[].class);
        kryo.register(char[].class);
        kryo.register(int[].class);
        kryo.register(float[].class);
        kryo.register(double[].class);
        
        return kryo;
    }
}

package adv.kryocodec;

import adv.vo.MyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class KryoEncoder extends MessageToByteEncoder<MyMessage> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MyMessage msg, ByteBuf out) throws Exception {
        KryoSerializer.serialize(msg, out);
        ctx.flush();
    }
}

package adv.kryocodec;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

// 反序列化的Handler
public class KryoDecoder extends ByteToMessageDecoder {


    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
                          List<Object> out) throws Exception {
        Object obj = KryoSerializer.deserialize(in);
        out.add(obj);
    }
}

package adv.kryocodec;

import adv.vo.EncryptUtils;
import adv.vo.MakeMsgId;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import java.util.HashMap;
import java.util.Map;

public class TestKryoCodeC {
    
    
    public MyMessage getMessage(int j) {
//        String content = "abcdefg-----------AAAAAAAAAA" + j;
        String content =  "abcdefg-----------AAAAAA" + j;
        MyMessage myMessage = new MyMessage();
        MsgHeader msgHeader = new MsgHeader();
        
        msgHeader.setMsgId(MakeMsgId.getID());
        msgHeader.setType((byte) 1);
        msgHeader.setPriority((byte) 7);
        msgHeader.setMd5(EncryptUtils.encryptObj(content));
        Map<String, Object> attachment = new HashMap<>();
        for (int i = 0; i < 10; i++) {
            attachment.put("city --> " + i, "cover " + i);
        }
        
        msgHeader.setAttachment(attachment);
        myMessage.setMsgHeader(msgHeader);
        myMessage.setBody(content);
        
        return myMessage;
    }

    public static void main(String[] args) {
        TestKryoCodeC testC = new TestKryoCodeC();

        for (int i = 0; i < 5; i++) {
            ByteBuf sendBuf = Unpooled.buffer();
            MyMessage message = testC.getMessage(i);
            System.out.println("Encode:" + message);
            KryoSerializer.serialize(message, sendBuf);
            MyMessage decodeMsg = (MyMessage)KryoSerializer.deserialize(sendBuf);
            System.out.println("Decode:" + decodeMsg);
            System.out.println("-----------------------------------------------");
        }
    }
}

10.业务实体类

package adv.business;

import serializable.msgpack.UserContact;

public class User {
    
    private String id;
    
    private String userName;
    
    private int age;
    
    private UserContact userContact;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public UserContact getUserContact() {
        return userContact;
    }

    public void setUserContact(UserContact userContact) {
        this.userContact = userContact;
    }

    public User() {
    }

    public User(String id, String userName, int age) {
        this.id = id;
        this.userName = userName;
        this.age = age;
    }

    @Override
    public String toString() {
        return "User{" +
                "id='" + id + '\'' +
                ", userName='" + userName + '\'' +
                ", age=" + age +
                ", userContact=" + userContact +
                '}';
    }
}

package adv.business;

public class UserContact {
    
    private String mail;
    
    private String phone;

    public UserContact() {
    }

    public UserContact(String mail, String phone) {
        this.mail = mail;
        this.phone = phone;
    }

    public String getMail() {
        return mail;
    }

    public void setMail(String mail) {
        this.mail = mail;
    }

    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    @Override
    public String toString() {
        return "UserContact{" +
                "mail='" + mail + '\'' +
                ", phone='" + phone + '\'' +
                '}';
    }
}

11.ClientPipeline()

package adv.client;

import adv.kryocodec.KryoDecoder;
import adv.kryocodec.KryoEncoder;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;


public class ClientInit extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
        
        // 连接写空闲检测
        ch.pipeline().addLast(new CheckWriteIdleHandler());
        // 粘包半包
        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,
                0,2,0,
                2));
        
        ch.pipeline().addLast(new LengthFieldPrepender(2));
        
        // 序列化相关
        ch.pipeline().addLast(new KryoDecoder());
        ch.pipeline().addLast(new KryoEncoder());
        
        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
        ch.pipeline().addLast(new LoginAuthReqHandler());
        
        // 连续读空闲检测
        ch.pipeline().addLast(new ReadTimeoutHandler(15));
        // 向服务器发出心跳请求
        ch.pipeline().addLast(new HeartBeatReqHandler());
        ch.pipeline().addLast(new ClientBusiHandler());
    }
}

12.客户端认证

package adv.client;

import adv.NettyServer;
import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoginAuthReqHandler extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(LoginAuthReqHandler.class);


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //发出认证请求
        MyMessage loginMsg = buildLoginReq();
        LOG.info("请求服务器认证:" + loginMsg);
        ctx.writeAndFlush(loginMsg);
//        super.channelActive(ctx);
    }

    private MyMessage buildLoginReq() {
        MyMessage myMessage = new MyMessage();
        MsgHeader msgHeader = new MsgHeader();
        msgHeader.setType(MessageType.LOGIN_REQ.value());
        myMessage.setMsgHeader(msgHeader);
        return myMessage;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//        super.channelRead(ctx, msg);

        MyMessage message = (MyMessage) msg;
        if (message.getMsgHeader() != null
                && message.getMsgHeader().getType() == MessageType.LOGIN_RESP.value()) {
            LOG.info("收到认证应答报文,服务器是否验证通过?....");
            byte loginResult = (byte) message.getBody();
            if (loginResult != (byte)0) {
                // 握手成功,关闭连接
                LOG.warn("未通过认证,关闭连接: " + message);
                ctx.close();
            } else {
                LOG.info("通过认证, 移除本处理器, 进入业务通信 :" + message);
                ctx.pipeline().remove(this);
                ReferenceCountUtil.release(msg);
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }
}

13.客户端心跳

package adv.client;

import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// 客户端在长久未向服务器业务请求时,发出心跳请求报文
public class HeartBeatReqHandler extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(LoginAuthReqHandler.class);

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) {
            MyMessage heartBeat = buildHeartBeat();
            LOG.debug("写空闲,发出心跳报文维持连接: " + heartBeat);
            ctx.writeAndFlush(heartBeat);
        }
        super.userEventTriggered(ctx, evt);
    }

    private MyMessage buildHeartBeat() {
        MyMessage myMessage = new MyMessage();
        MsgHeader msgHeader = new MsgHeader();
        msgHeader.setType(MessageType.HEARTBEAT_REQ.value());
        myMessage.setMsgHeader(msgHeader);
        return myMessage;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//        super.channelRead(ctx, msg);
        MyMessage message = (MyMessage)msg;
        if (message.getMsgHeader() != null
                && message.getMsgHeader().getType() == MessageType.HEARTBEAT_RESP.value()) {
            LOG.debug("收到服务器心跳应答, 服务器正常");
            ReferenceCountUtil.release(msg);
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof ReadTimeoutException) {
            LOG.warn("服务器长时间未应答, 关闭链路");
        }
        super.exceptionCaught(ctx, cause);
    }
}

14.客户端检查写空闲

package adv.client;

import io.netty.handler.timeout.IdleStateHandler;

public class CheckWriteIdleHandler extends IdleStateHandler {
    
    
    public CheckWriteIdleHandler () {
        super(0,8,0);
    }
}

15.客户端业务处理

package adv.client;

import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientBusiHandler extends SimpleChannelInboundHandler<MyMessage> {
    private static final Logger LOG = LoggerFactory.getLogger(ClientBusiHandler.class);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyMessage msg) throws Exception {
        
        LOG.info("业务应答消息: " + msg.toString());
    }
}

16.客户端启动

package adv;

import adv.client.ClientInit;
import adv.vo.*;
import constant.Constant;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class NettyClient implements Runnable{

    private static final Logger LOG = LoggerFactory.getLogger(NettyClient.class);

    // 负责重连的线程池
    private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
    
    private Channel channel;
    
    private EventLoopGroup group = new NioEventLoopGroup();
    
    // 是否用户主动关闭连接的标志
    private volatile boolean userClose = false;
    
    // 连接是否成功关闭的标志
    private volatile boolean connected = false;
    
    public boolean isConnected() {
        return connected;
    }
    
    public void connect(int port, String host) throws InterruptedException {
        try {
            // 客户端启动必备
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class) // 指定使用NIO的通信模式
                    .handler(new ClientInit());
            ChannelFuture future = b.connect(new InetSocketAddress(host, port)).sync();
            LOG.info("已连接服务器");
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    LOG.info("连接事件产生回调....operationCompletable");
                }
            });

            channel = future.channel();
            synchronized (this) {
                this.connected = true;
                this.notifyAll();
            }

            channel.closeFuture().sync();            
        } finally {
            if (!userClose) {
                // 非正常关闭,有可能发生了网络问题
                LOG.warn("需要进行重连");
                
                executorService.execute(() -> {
                    try {
                        // 给操作系统足够的时间取释放相关的资源
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
//                        throw new RuntimeException(e);
                    }
                });
            } else {
                // 正常关闭
                channel = null;
                group.shutdownGracefully().sync();
                synchronized (this) {
                    this.connected = false;
                    this.notifyAll();
                }
            }
        }

    }
    @Override
    public void run() {
        try {
            connect(Constant.DEFAULT_PORT, Constant.DEFAULT_SERVER_IP);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    public void sendOneWay(Object message) throws IllegalAccessException {
        if (channel == null || !channel.isActive()) {
            throw new IllegalAccessException("和服务器还没建立起有效连接,请稍后再试");
        }

        MyMessage myMessage = new MyMessage();
        MsgHeader msgHeader = new MsgHeader();
        
        msgHeader.setMsgId(MakeMsgId.getID());
        msgHeader.setType(MessageType.ONE_WAY.value());
        msgHeader.setMd5(EncryptUtils.encryptObj(message));
        myMessage.setMsgHeader(msgHeader);
        myMessage.setBody(message);
        
        channel.writeAndFlush(myMessage);
    }

    public void send(Object message) throws IllegalAccessException {
        if (channel == null || !channel.isActive()) {
            throw new IllegalAccessException("和服务器还没建立起有效连接,请稍后再试");
        }

        MyMessage myMessage = new MyMessage();
        MsgHeader msgHeader = new MsgHeader();

        msgHeader.setMsgId(MakeMsgId.getID());
        msgHeader.setType(MessageType.SERVICE_REQ.value());
        msgHeader.setMd5(EncryptUtils.encryptObj(message));
        myMessage.setMsgHeader(msgHeader);
        myMessage.setBody(message);

        channel.writeAndFlush(myMessage);
    }
    
    public void close() {
        userClose = true;
        channel.close();
    }
}

package adv;

import adv.business.User;
import serializable.msgpack.UserContact;

import java.util.Scanner;

public class BusiClient {

    public static void main(String[] args) throws InterruptedException, IllegalAccessException {
        NettyClient nettyClient = new NettyClient();
        new Thread(nettyClient).start();
        while (!nettyClient.isConnected()) {
            synchronized (nettyClient) {
                nettyClient.wait();
            }
        }

        System.out.println("网络通信已准备好,可以进行业务操作了。。。。。");
        Scanner scanner = new Scanner(System.in);
        while (true) {
            String msg = scanner.next();
            if (msg == null) {
                break;
            } else if ("q".equals(msg.toLowerCase())) {
                nettyClient.close();
                scanner.close();
                while (nettyClient.isConnected()) {
                    synchronized (nettyClient) {
                        System.out.println("等待网络关闭完成....");
                        nettyClient.wait();
                    }
                }
                System.exit(1);
            } else if ("v".equals(msg.toLowerCase())) {
                User user = new User();
                user.setAge(19);
                String userName = "cover";
                user.setUserName(userName);
                user.setId("No:1");
                user.setUserContact(
                        new UserContact(userName + "@gmail.com", "133"));
                
                nettyClient.sendOneWay(user);
                
            } else {
                nettyClient.send(msg);
            }
        }
    }
}

17.常量类

package constant;

import java.util.Date;

/**
 * 常量
 */
public class Constant {
    
    public static final Integer DEFAULT_PORT = 7777;
    
    public static final String DEFAULT_SERVER_IP= "127.0.0.1";
    
    // 根据输入信息拼接出一个应答信息
    public static String response(String msg) {
        return "Hello, " + msg + ", Now is" + new Date(System.currentTimeMillis()).toString(); 
    }
}

18.结果展示

Server端启动包含认证

在这里插入图片描述

Client端启动

在这里插入图片描述

心跳应答

在这里插入图片描述
在这里插入图片描述

ONE_WAY

在这里插入图片描述
在这里插入图片描述

TWO_WAY

在这里插入图片描述
在这里插入图片描述

客户端退出请求

在这里插入图片描述


原文地址:https://blog.csdn.net/Cover_sky/article/details/135921583

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!