锐单电子商城 , 一站式电子元器件采购平台!
  • 电话:400-990-0325

Netty(5)之私有协议栈开发

时间:2023-07-30 14:07:08 header11连接器

1. Netty协议栈功能设计

Netty基于内部模块之间通信的协议栈 TCP/IP 协议栈,是一个类 HTTP 与传统标准协议栈相比,协议的应用层协议栈更轻、更灵活、更实用

在这里插入图片描述

1.1 功能描述

Netty协议栈承载了业务内各模块之间的信息交互和服务调用,主要功能是:

1.2 交互模型

  • 客户端发送握手信息,携带节点ID等待有效的身份认证信息
  • 服务端对握手请求消息进行合法性校验,包括节点ID有效性检验、节点反复验证和IP验证地址合法性后,返回成功登录的握手响应信息
  • 链路建立成功后,客户端发送业务数据
  • 链路成功后,服务端发送心跳消息
  • 链路建立成功后,客户端发送心跳信息
  • 链路建立成功后,服务端发送业务数据
  • 服务退出时,服务端关闭连接,客户端感知对方关闭连接后,被动关闭客户端连接

备注: Netty协议通信双方链接建立成功后,双方可以进行双全工通信,客户和服务端都可以主动向对象发送请求信息,通信方式可以是 TWO WAY或者 ONE WAY。采用双方心跳 Ping-Pong机制,当链路处于空闲状态时,客户端主动发送Ping服务端收到消息Ping消息发送回复消息后Pong如果客户端连续发送N条给客户端Ping没有收到服务端返回的消息。Pong消息显示链路已挂断或对方处于异常状态,客户端关闭连接,间隔T后重新连接,直到重新连接成功

1.3 消息定义表

协议栈消息定义包括两部分:

  • 消息头
  • 消息体
名称 类型 长度 描述
header Header 变长 消息头定义
body Object 变成 对于请求信息,它是方法参数;对于相应的信息,它是返回值

1.3.1 消息头

名称 类型 长度 描述
crcCode int 32 协议栈的校检码由三部分组成:
1:OxABEF:固定值表明该消息是协议消息,有两个字节
2:主版本号:1 - 255 ,1个字节
三:二次版本号:1 - 255, 1个字节
crcCode = 0xABEF 主版本号 次版本号
length int 32 整个消息包括消息头和消息体
sessionID long 64 会话是集群节点中唯一的全局ID生成器生成
type Byte 8 0:业务请求信息
1.业务响应信息
2:业务ONE WAY 消息(既是请求又是响应消息)
三、握手请求消息
4.握手回答消息
5.心跳请求信息
6:心跳应答消息
priority Byte 8 新闻优先级:0 - 255
attachment Map 变长 可选字段用于扩展消息头

1.3.2 协议编码规范

  • crcCode:ByteBuffer.putInt(int value),如果采用其他缓冲区,必须等价
  • length:ByteBuffer.putInt(int value),如果采用其他缓冲区,必须等价
  • sessionID:ByteBuffer.putLong(int value),如果采用其他缓冲区,必须等价
  • type:ByteBuffer.put(byte value),如果采用其他缓冲区,必须等价
  • priority:ByteBuffer.put(byte value),如果采用其他缓冲区,必须等价
  • attachment:如果长度为 0.如果没有可选附件,则将其长度编码为 0,ByteBuffer.put若大于 说明有附件需要编码
    • 首先,编码附件的数量:ByteBuffer.putInt(attachment.size())
    • Key的编码:buffer.writeString(key)
    • Value编码:转换成 Byte[] 数组,buffer.writeBinary(value)
  • body:序列化为 byte[] 然后调用数组 ByteBuffer.put(byte[] src),最后重新确定 length 重写字段ByteBuffer中

1.3.3 协议解码规范

  • crcCode:ByteBuffer.getInt()如果采用其他缓冲区,必须等价
  • length:ByteBuffer.getInt()如果采用其他缓冲区,必须等价
  • sessionID:ByteBuffer.getLong()如果采用其他缓冲区,必须等价
  • type:ByteBuffer.get()如果采用其他缓冲区,必须等价
  • priority:ByteBuffer.get(),如果采用其他缓冲区实现,必须与其等价
  • attachment:首先 ByteBuffer.getInt() 如果附件的长度为 0.说明附件为空,解码结束;如果不是空的,则根据长度循环解码
    • 获取附件长度:ByteBuffer.getInt()
    • Key的解码:buffer.readString()
    • Value的解码:buffer.readBinary(), 根据获得的数据在解码框中反序列化
  • body:通过框架解码

1.4 链路建立

对于使用 Netty 就协议栈的应用程序而言,无需刻意区分是客户端还是服务端。在分布式网络环境中,节点可能是服务端和客户端

Netty协议栈对客户端的调用说明如下:如果A节点启动呼叫B节点的服务,但A和B物理链路尚未建立,调用方将主动发起连接,此时调用方为客户端,调用方为服务端。

考虑到安全,建立链路需要基于IP以地址或号段黑白名单安全认证机制为例,本协议基于IP如果有多个地址的安全认证IP,通过逗号进行分割。

1.4.1 客户端请求

    li>消息头的type字段值为3
  • 可选附件为0
  • 消息体为空
  • 握手消息的长度为22个字节

1.4.2 服务端应答

  • 消息头的type字段值为4
  • 可选附件个数为0
  • 消息体为byte类型的结果,0:认证成功,-1:认证失败

1.5 链路关闭

  • 当对方宕机或者重启时,会主动关闭链路,当得知对方 REST 链路需要关闭连接,释放自身的句柄等资源。由于采用TCP双全工通信,通信双发都要关闭连接
  • 消息读写过程中,发生了 I/O 异常,需要主动关闭连接
  • 心跳消息读写过程中发生了 I/O 异常,需要主动关闭连接
  • 心跳超时,需要主动关闭
  • 发生编码异常等不可回复错误时,需要主动关闭连接

1.6 可靠性设计

为了保证在恶劣的网络环境中,网络超时、闪断、对方进程僵死或者处理缓慢等情况,需要保证协议栈能够正常工作或者自动恢复,需要对其可靠性进行统一规划和设计

1.6.1 心跳机制

  • 当网络处于空闲时间达到 T(连续周期没有读写消息) 时,客户端主动发送 Ping
  • 如果在下一个周期 T 到来时客户端没有收到对方发送的 Pong 心跳应答,则心跳失败计数器 +1
  • 接收到客户端的业务消息或者Pong时,将心跳计数器清零;连续N次没有接收到服务端的Pong消息或者业务消息,则关闭链路,间隔INTERVAL时间后发起重连操作

通过Ping - Pong双向心跳机制,可以保证无论通信哪一方出现网络故障,都能被即时的检测出来,为了防止对方短时间没有及时返回应答造成的误判,只有连续N次心跳检测都失败了才认定链路已经损害,需要关闭链路,重新建立链路

1.6.2 重连机制

如果链路中断,等待INTERVAL时间后,由客户端发起重连操作,如果重连失败,间隔周期INTERVAL后再次发起重连,直到重连成功。

为了保证服务端能够有充足的时间释放句柄资源,在首次断连时客户端需要等待INTERVAL时间之后再发起重连,而不是失败后就立即重连。

为了保证句柄资源能够及时释放,无论什么场景下的重连失败,客户端都必须保证自身的资源被及时释放,包括但不限于SocketChannel、Socket 等。

重连失败后,需要打印异常堆栈信息,方便后续的问题定位。

1.6.3 重复登录保护

当客户端握手成功之后,在链路处于正常状态下,不允许客户端重复登录,以防止客户端在异常状态下反复重连导致句柄资源被耗尽。服务端接收到客户端的握手请求消息之后,首先对IP地址进行合法性检验,如果校验成功,在缓存的地址表中查看客户端是否已经登录,如果已经登录,则拒绝重复登录,返回错误码-1,同时关闭TCP链路,并在服务端的日志中打印握手失败的原因。

客户端接收到握手失败的应答消息之后,关闭客户端的TCP连接,等待INTERVAL时间之后,再次发起TCP连接,直到认证成功。

为了防止由服务端和客户端对链路状态理解不一致导致的客户端无法握手成功的问题,当服务端连续N次心跳超时之后需要主动关闭链路,清空该客户端的地址缓存信息,以保证后续该客户端可以重连成功,防止被重复登录保护机制拒绝掉。

1.6.4 消息缓存重发

无论客户端还是服务端,当发生链路中断之后,在链路恢复之前,缓存在消息队列中待发送的消息不能丢失,等链路恢复之后,重新发送这些消息,保证链路中断期间消息不丢失。
考虑到内存溢出的风险,建议消息缓存队列设置上限,当达到上限之后,应该拒绝继续向该队列添加新的消息。

1.7 安全性设计

为了保证整个集群环境的安全,内部长连接采用基于IP地址的安全认证机制,服务端对握手请求消息的IP地址进行合法性校验:如果在白名单之内,则校验通过;否则,拒绝对方连接。

如果将Netty协议栈放到公网中使用,需要采用更加严格的安全认证机制,例如基于密钥和AES加密的用户名+密码认证机制,也可以采用SSL/TSL安全传输。

作为示例程序,Netty 协议栈采用最简单的基于IP地址的白名单安全认证机制。

1.8 可扩展性设计

Netty协议需要具备一定的扩展能力,业务可以在消息头中自定义业务域字段,例如消息流水号、业务自定义消息头等。通过Netty消息头中的可选附件attachment字段,业务可以方便地进行自定义扩展。

Netty协议栈架构需要具备一定的扩展能力,例如统一的消息拦截、 接口日志、安全、加解密等可以被方便地添加和删除,不需要修改之前的逻辑代码,类似Servlet的Filter Chain和AOP,但考虑到性能因素,不推荐通过AOP来实现功能的扩展。

2. Netty协议栈开发


    io.netty
    netty-all
    5.0.0.Alpha1




    org.jboss.marshalling
    jboss-marshalling
    1.3.0.CR9


    org.jboss.marshalling
    jboss-marshalling-serial
    1.3.0.CR9

2.1 通用对象

2.1.1 MarshallingCodeCFactory

public final class MarshallingCodeCFactory { 
        

    /** * 创建Jboss Marshalling解码器 * @return MarshallingDecoder */
    public static MarshallingDecoder buildMarshallingDecoder(){ 
        
        //首先通过Marshalling工具类的方法获取Marshalling实例对象,参数serial标识创建的是java序列化工厂
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        //创建MarshallingConfiguration对象,配置版本号为5
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);

        //根据marshallerFactory和configuration创建provider
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);

        //构建Netty的MarshallingDecoder对象,俩参数分别为provider和单个消息序列化后的最大长度
        return new MarshallingDecoder(provider, 1024 * 1024);
    }

    /** * 创建Jboss Marshalling编码器MarshallingEncoder * @return MarshallingEncoder */
    public static MarshallingEncoder buildMarshallingEncoder(){ 
        
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
        return new MarshallingEncoder(provider);
    }

    /** * marshaller 编码器 * @return * @throws IOException */
    public static Marshaller buildMarshalling() throws IOException { 
        
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        return marshallerFactory.createMarshaller(configuration);
    }

    /** * Unmarshaller 解码器 * @return * @throws IOException */
    public static Unmarshaller buildUnMarshalling() throws IOException { 
        
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        return marshallerFactory.createUnmarshaller(configuration);
    }
}

2.1.2 消息类型

/** * 枚举消息类型 */
@Getter
public enum MessageType { 
        
    /** Login req message type */
    LOGIN_REQ(3, "握手请求消息"),
    /** Login resp message type */
    LOGIN_RESP(4, "握手应答消息"),

    HEARTBEAT_REQ(5, "心跳请求"),

    HEARTBEAT_RESP(6, "心跳响应");

    /** Describe */
    private final String describe;
    /** Code */
    private final byte code;

    /** * Message type * * @param code code * @param describe describe * @since 1.0 */
    MessageType(Integer code, String describe) { 
        
        this.describe = describe;
        this.code = code.byteValue();
    }
}

2.1.3 ip常量值

/** * ip和port地址 */
public class NettyConstant { 
        

    /** LOCAL_IP */
    public static final String LOCAL_IP = "本机IP";

    /** REMOTE_IP */
    public static final String REMOTE_IP = "服务器IP";

    /** REMOTE_PORT */
    public static final Integer REMOTE_PORT = 8900;

    /** LOCAL_PORT */
    public static final Integer LOCAL_PORT = 8901;
}

2.1.4 Header

/** * netty协议头部包 */
@Data
@ToString
@Builder
public class Header { 
        
    //协议校检码
    private int crcCode = 0xabef0101;
    //消息长度
    private int length;
    //全局唯一sessionId
    private long sessionId;
    //消息类型
    private byte type;
    //消息优先级
    private byte priority;
    //消息附件,用于扩展协议
    private Map<String, Object> attachment;
}

2.1.5 NettyProtocolMessage

/** * netty协议消息体 */
@Data
@ToString
@Builder
public final class NettyProtocolMessage { 
        
    //头部数据
    private Header header;
    //消息体
    private Object body;

    /** * 创建Message对象 * * @param byteBuf * @param decoder * @return */
    public static NettyProtocolMessage createMessage(ByteBuf byteBuf, MarshallingDecoder decoder) throws IOException { 
        
        //读取消息头部,按照指定直接的长度
        int crcCode = byteBuf.readInt();
        int length = byteBuf.readInt();
        long sessionId = byteBuf.readLong();
        byte type = byteBuf.readByte();
        byte priority = byteBuf.readByte();

        //读取附件数据
        int size = byteBuf.readInt();
        Map<String, Object> attach = null;
        if (size > 0) { 
        
            attach = new HashMap<>(size);
            for (int i = 0; i < size; i++) { 
        
                //先获取到 key数据的长度
                int keySize = byteBuf.readInt();
                //创建一个key长度的字节数据
                byte[] keyArrays = new byte[keySize];
                //读取数据
                byteBuf.readBytes(keyArrays);
                //转换成string字符串形式
                String key = new String(keyArrays, StandardCharsets.UTF_8);
                //读取value数据
                attach.put(key, decoder.decode(byteBuf));
            }
        }
        Object body = null;
        //如果剩下未读的数据长度大于4个字节,证明有body的数据,进行解码
        if (byteBuf.readableBytes() > 4) { 
        
            body = decoder.decode(byteBuf);
        }
        Header header = Header.builder()
            .crcCode(crcCode)
            .length(length)
            .sessionId(sessionId)
            .type(type)
            .priority(priority)
            .attachment(attach).build();
        return NettyProtocolMessage.builder().header(header).body(body).build();
    }
}

2.2 解码器

2.2.1 MarshallingDecoder

封装 Unmarshaller 用于对消息对象的解码

public class MarshallingDecoder { 
        

    /** * 构建一个解码器 */
    private final Unmarshaller provider;

    /** * Marshalling decoder * * @throws IOException io exception * @since 1.0 */
    public MarshallingDecoder() throws IOException { 
        
        this.provider = MarshallingCodeCFactory.buildUnMarshalling();
    }

    /** * 执行数据解码 * * @param byteBuf byte buf * @return object object * @throws IOException io exception * @since 1.0 */
    public Object decode(ByteBuf byteBuf) throws IOException { 
        
        //先读取对象的长度
        int objSize = byteBuf.readInt();
        //创建一个新的buf进行读取
        ByteBuf objBuf = byteBuf.slice(byteBuf.readerIndex(), objSize);
        ChannelBufferByteInput channelBufferByteInput = new ChannelBufferByteInput(objBuf);
        Object object = null;
        try { 
        
            provider.start(channelBufferByteInput);
            object = provider.readObject();
            provider.finish();
            //将读的索引位置设置到当前读的索引位置+上对象长度的位置
            byteBuf.readerIndex(byteBuf.readerIndex() + objSize);
        } catch (ClassNotFoundException e) { 
        
            e.printStackTrace();
        } finally { 
        
            provider.close();
        }
        return object;
    }
}

2.2.2 NettyMessageDecoder

NettyMessageDecoder :实现了 LengthFieldBasedFrameDecoder 定长包解码器,通过指定对应索引的字节数据作为包的长度数据,其中需要注意的是 lengthAdjustment 矫正字段的使用,用于矫正包的长度

/** * netty消息解码处理器:LengthFieldBasedFrameDecoder 对定长包进行处理 */
public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder { 
        

    /** * 创建解码器对象 */
    private final MarshallingDecoder marshallingDecoder;

    /** * Netty message decoder * * @param maxFrameLength 包的最大长度 * @param lengthFieldOffset 字段偏移位置,比如在 buf.index为4的地方表示消息长度 * @param lengthFieldLength 消息长度字段的长度,例如int:就4个字节 * @param lengthAdjustment 长度矫正,如果不配置这里会导致读出来的长度比数据包的长度长,导致解析不了 * @param initialBytesToStrip 初始需要跳过的字节长度 * @throws IOException io exception * @since 1.0 */
    public NettyMessageDecoder(int maxFrameLength,
                               int lengthFieldOffset,
                               int lengthFieldLength,
                               int lengthAdjustment,
                               int initialBytesToStrip) throws IOException { 
        
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
        this.marshallingDecoder = new MarshallingDecoder();
    }

    /** * Decode * * @param ctx ctx * @param in in * @return the object * @throws Exception exception * @since 1.0 */
    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { 
        
        /** * 默认使用netty的包长度解码器 */
        ByteBuf byteBuf = (ByteBuf) super.decode(ctx, in);
        if (byteBuf == null) { 
        
            return null;
        }
        return NettyProtocolMessage.createMessage(byteBuf, marshallingDecoder);
    }

    /** * Gets marshalling decoder * * * @return the marshalling decoder * @since 1.0 */
    public MarshallingDecoder getMarshallingDecoder() { 
        
        return marshallingDecoder;
    }
}

2.3.3 ChannelBufferByteInput

用于封装 ByteBuf 进行数据的读取

/** * 封装一个字节输入流,用于 Marshall 读取buffer中对象数据 */
public class ChannelBufferByteInput implements ByteInput { 
        

    /** Buffer */
    private final ByteBuf buffer;

    /** * Channel buffer byte input * * @param buffer buffer * @since 1.0 */
    public ChannelBufferByteInput(ByteBuf buffer) { 
        
        this.buffer = buffer;
    }

    /** * Close * * @throws IOException io exception * @since 1.0 */
    @Override
    public void close() throws IOException { 
        
        // nothing to do
    }

    /** * Available * * @return the int * @throws IOException io exception * @since 1.0 */
    @Override
    public int available() throws IOException { 
        
        return buffer.readableBytes();
    }

    /** * Read * * @return the int * @throws IOException io exception * @since 1.0 */
    @Override
    public int read() throws IOException { 
        
        if (buffer.isReadable()) { 
        
            return buffer.readByte() & 0xff;
        }
        return -1;
    }

    /** * Read * * @param array array * @return the int * @throws IOException io exception * @since 1.0 */
    @Override
    public int read(byte[] array) throws IOException { 
        
        return read(array, 0, array.length);
    }

    /** * Read * * @param dst dst * @param dstIndex dst index * @param length length * @return the int * @throws IOException io exception * @since 1.0 */
    @Override
    public int read(byte[] dst, int dstIndex, int length) throws IOException { 
        
        int available = available();
        if (available == 0) { 
        
            return -1;
        }

        length = Math.min(available, length);
        buffer.readBytes(dst, dstIndex, length);
        return length;
    }

    /** * Skip * * @param bytes bytes * @return the long * @throws IOException io exception * @since 1.0 */
    @Override
    public long skip(long bytes) throws IOException { 
        
        元器件数据手册IC替代型号,打造电子元器件IC百科大全!
          

相关文章