首页 > 技术文章 > Netty笔记(5) - 编码解码机制 和 Protobuf技术

xjwhaha 2020-08-25 11:20 原文

介绍:

  • 编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,在发送数据时就需要编码,接收数据时就需要解码
  • codec(编解码器) 的组成部分有两个:decoder(解码器)和 encoder(编码器)。encoder 负责把业务数据转换成字节码数据,decoder 负责把字节码数据转换成业务数据

示意图:

Netty自身提供的一些编解码器:

  1. Netty 提供的编码器
    StringEncoder,对字符串数据进行编码
    ObjectEncoder,对 Java 对象进行编码
    ...
  2. Netty 提供的解码器
    StringDecoder, 对字符串数据进行解码
    ObjectDecoder,对 Java 对象进行解码

Netty 本身自带的 ObjectDecoder 和 ObjectEncoder 可以用来实现 POJO 对象或各种业务对象的编码和解码,底层使用的仍是 Java 序列化技术 , 而Java 序列化技术本身效率就不高,存在如下问题

  • 无法跨语言
  • 序列化后的体积太大,是二进制编码的 5 倍多。
  • 序列化性能太低

可以使用Google 的Protobuf 技术解决上述问题, 这个后面介绍 先说一下 编解码器的调用机制

用StringEncoder(编码) 和 StringDecoder(解码) 为例:

public class StringEncoder extends MessageToMessageEncoder<CharSequence>
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter 

查看源码得知 StringEncoder 间接继承了 ChannelOutboundHandlerAdapter类,所以 他实际上是一个outboundHandler(出站),很好理解,信息往外发送 当时是编码了,以此推断StringDecoder 是一个inboundHandler(入站),读取数据时解码.

编码解码器都是以Handler 处理器注册在pipeLine的调用链中 在读取 发送数据 时进行编解码

自定义编码解码器

服务端:

注册自定义的编码解码器

public class MyServer {
    public static void main(String[] args) throws Exception{

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>{
                     @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();

                        //入站的handler进行解码 MyByteToLongDecoder
                        pipeline.addLast(new MyByteToLongDecoder());
                //        pipeline.addLast(new MyByteToLongDecoder2());
                        //出站的handler进行编码
                        pipeline.addLast(new MyLongToByteEncoder());
                        //自定义的handler 处理业务逻辑
                        pipeline.addLast(new MyServerHandler());
                        System.out.println("xx");
                    }
                }); //自定义一个初始化类


            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();

        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

自定义的 解码器 继承 ByteToMessageDecoder 会在自定义Handler类之前执行

在解码器 进行数据解码时,需要判断 缓存区(ByteBuf)的数据是否足够 ,否则接收到的结果会期望结果可能不一致(** 继承ReplayingDecoder<S> 则不需要进行判断 这个类自动检测 数据是否符合 泛型S **):

读取通道中的数据 并调用 该Handler的 decode方法, ByteBuf则为读取到的数据,进行自定义的解码,并添加到List中, 方法结束后 会遍历List中的数据 多次执行下一个Handler

//源码
for (int i = 0; i < numElements; i ++) {
    ctx.fireChannelRead(msgs.getUnsafe(i));
}
public class MyByteToLongDecoder extends ByteToMessageDecoder {
    /**
     *
     * @param ctx 上下文对象
     * @param in 入站的 ByteBuf
     * @param out List 集合,将解码后的数据传给下一个handler . 下一个Handler 接受的数据就为List中的数据
            向下一个Handler执行的代码 如果List 数据为多个 则传递多次
     * @throws Exception
     */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        //如果读取的数据 等于 16 则会 分两次执行下一个Handler
        System.out.println("MyByteToLongDecoder 被调用");
        //因为 long 8个字节, 需要判断有8个字节,才能读取一个long 
        if(in.readableBytes() >= 8) {
            out.add(in.readLong());
        }
    }
}

自定义编码器 继承MessageToByteEncoder类 泛型为 处理的数据 如果发送的数据不为 Long 则不会调用该方法 :

public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
    //编码方法
    @Override
    protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
        System.out.println("MyLongToByteEncoder encode 被调用");
        System.out.println("msg=" + msg);
        out.writeLong(msg);
    }
}

客户端:

public class MyClient {
    public static void main(String[] args)  throws  Exception{

        EventLoopGroup group = new NioEventLoopGroup();

        try {

            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            ChannelPipeline pipeline = ch.pipeline();

                            //加入一个出站的handler 对数据进行一个编码
                            pipeline.addLast(new MyLongToByteEncoder());
                            //这时一个入站的解码器(入站handler )
                            pipeline.addLast(new MyByteToLongDecoder());
//                            pipeline.addLast(new MyByteToLongDecoder2());
                            //加入一个自定义的handler , 处理业务
                            pipeline.addLast(new MyClientHandler());

                        }
                    }); //自定义一个初始化类

            ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();

            channelFuture.channel().closeFuture().sync();

        }finally {
            group.shutdownGracefully();
        }
    }
}

客户端Handler 通道建立 发送信息 只有发送Long类型的数据才会调用编码器

public class MyClientHandler  extends SimpleChannelInboundHandler<Long> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {

        System.out.println("服务器的ip=" + ctx.channel().remoteAddress());
        System.out.println("收到服务器消息=" + msg);

    }

    //重写channelActive 发送数据

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("MyClientHandler 发送数据");
        ctx.writeAndFlush(123456L); //发送的是一个long

       // ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcdabcd",CharsetUtil.UTF_8));

    }
}

Netty自带的解码器:

  • LineBasedFrameDecoder:这个类在Netty内部也有使用,它使用行尾控制字符(\n或者\r\n)作为分隔符来解析数据。

  • DelimiterBasedFrameDecoder:使用自定义的 特殊字符作为消息的分隔符。

  • HttpObjectDecoder:一个HTTP数据的解码器

  • LengthFieldBasedFrameDecoder:通过指定长度来标识整包消息,这样就可以自动的处理黏包和半包消息。

Protobuf技术

上文中说到,在客户端和服务端传输数据的格式问题上 有着或多或少的问题,而Google Protobuf 就是解决这些问题的. 官方文档

工作机制:

使用 protobuf 编译器能自动生成代码,Protobuf 是将类的定义使用.proto 文件进行描述。然后通过 protoc.exe 编译器根据.proto 自动生成.java 文件.

原理分析:

通过一定的规范 构建你想要表达的业务含义,只要解码端能够支持这种规范,就能还原出对应的 该语言代码(跨语言),而它本身也对序列化过程进行优化,极尽所能的压榨每一寸空间和性能.如netty的 支持此规范的 编码器中.

各个语言的字段表达含义:

protobuf 使用示意图:

简单的小案例

实例代码:

syntax = "proto3"; //版本
option java_outer_classname = "StudentPOJO";//生成的外部类名,同时也是文件名

//protobuf 使用message 管理数据

message Student { //会在 StudentPOJO 外部类生成一个内部类 Student, 他是真正发送的POJO对象
    int32 id = 1; // Student 类中有 一个属性 名字为 id 类型为int32(protobuf类型) 1表示属性序号,不是值
    string name = 2;
}

使用protobuf编译器解析这个文件,命令protoc.exe --java_out=. Student.proto

生成的java类伪代码:

class StudentPOJO{
    class Student{
        int id;
        String name;
    }
}

使用Netty 包装发送此对象

服务端 注册ProtobufDecoder 解码器:

public class NettyServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup) //设置两个线程组
                    .channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现
                    .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
//                    .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup
                    .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
                        //给pipeline 设置处理器
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {


                            ChannelPipeline pipeline = ch.pipeline();
                            //在pipeline加入ProtoBufDecoder
                            //指定对哪种对象进行解码
                            pipeline.addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
                            pipeline.addLast(new NettyServerHandler());
                        }
                    }); 

            System.out.println(".....服务器 is ready...");

            //绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
            //启动服务器(并绑定端口)
            ChannelFuture cf = bootstrap.bind(6668).sync();

            //给cf 注册监听器,监控我们关心的事件

            cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (cf.isSuccess()) {
                        System.out.println("监听端口 6668 成功");
                    } else {
                        System.out.println("监听端口 6668 失败");
                    }
                }
            });
            //对关闭通道进行监听
            cf.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

}

服务端Handler:

public class NettyServerHandler extends SimpleChannelInboundHandler<StudentPOJO.Student> {


    //读取数据实际(这里我们可以读取客户端发送的消息)
    /*
    1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
    2. Object msg: 就是客户端发送的数据 默认Object
     */
    @Override
    public void channelRead0(ChannelHandlerContext ctx, StudentPOJO.Student msg) throws Exception {
        //读取从客户端发送的StudentPojo.Student
        System.out.println("客户端发送的数据 id=" + msg.getId() + " 名字=" + msg.getName());
    }

    //数据读取完毕
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

客户端 发送数据给服务端 添加 ProtoBufEncoder编码器:

public class NettyClient {
    public static void main(String[] args) throws Exception {

        EventLoopGroup group = new NioEventLoopGroup();

        try {
            //创建客户端启动对象
            //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
            Bootstrap bootstrap = new Bootstrap();

            //设置相关参数
            bootstrap.group(group) //设置线程组
                    .channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            //在pipeline中加入 ProtoBufEncoder
                            pipeline.addLast("encoder", new ProtobufEncoder());
                            pipeline.addLast(new NettyClientHandler()); //加入自己的处理器
                        }
                    });

            System.out.println("客户端 ok..");

            //启动客户端去连接服务器端
            //关于 ChannelFuture 要分析,涉及到netty的异步模型
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
            //给关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        }finally {

            group.shutdownGracefully();

        }
    }
}

客户端Handler 构建Student类的过程看代码:

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    //当通道就绪就会触发该方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        //发生一个Student 对象到服务器
        StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(4).setName("智多星 吴用").build();
        //Teacher , Member ,Message
        ctx.writeAndFlush(student);
    }

    //当通道有读取事件时,会触发
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

这个案例有个很大问题 ,就是已经对发送和接受数据的类型定死,下面进行改造:

proto 定义一个包含 Student 或者 Worker的类 发送接受时 根据 枚举字段 获取:

syntax = "proto3";
option optimize_for = SPEED; // 加快解析
option java_package="com.atguigu.netty.codec2";   //指定生成到哪个包下
option java_outer_classname="MyDataInfo"; // 外部类名, 文件名

//protobuf 可以使用message 管理其他的message
message MyMessage {

    //定义一个枚举类型( 内部类 ))
    enum DataType {
        StudentType = 0; //在proto3 要求enum的编号从0开始
        WorkerType = 1;
    }

    //用data_type 来标识传的是哪一个枚举类型
    DataType data_type = 1;

    //表示每次枚举类型最多只能出现其中的一个, 节省空间  ,这两个属性只能赋值一个 赋值的后一个会替换前一个
    oneof dataBody {
        Student student = 2;
        Worker worker = 3;
    }

}
message Student {
    int32 id = 1;//Student类的属性
    string name = 2; //
}
message Worker {
    string name=1;
    int32 age=2;
}

java伪代码:

class MyDataInfo{
    class MyMessage{
        enum DataType{
            StudentType,WorkerType
        }
        
        DataType data_type;
        
        //Student 和Worker 只可存在一个 设置后一个会替换前一个
        Student student;
        Worker worker;
        
    }
    class Student{
        int id;
        String name;
    }
    class Worker{
        String name;
        int age;
    }
}

改造上述案例的 客户端发送信息的方法 channelActive (随机发送一个对象) :

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //随机的发送Student 或者 Workder 对象
    int random = new Random().nextInt(3);
    MyDataInfo.MyMessage myMessage = null;

    if(0 == random) { //发送Student 对象
        myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.StudentType).setStudent(MyDataInfo.Student.newBuilder().setId(5).setName("玉麒麟 卢俊义").build()).build();
    } else { // 发送一个Worker 对象
        myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.WorkerType).setWorker(MyDataInfo.Worker.newBuilder().setAge(20).setName("老李").build()).build();
    }

    ctx.writeAndFlush(myMessage);
}

改造服务端 添加的 解码器 指定的是包含 Student 和Worker的 MyMessage对象:

...
//指定对哪种对象进行解码(注册解码器)
pipeline.addLast("decoder", new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
...

改造 服务端接受数据的Handler 指定接受数据类型的泛型也为MyMessage对象 ,并根据 DataType字段 获取相应的对象:

//处理Handler类 指定的对象为 MyMessage 类型
public class NettyServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {

    //读取数据实际(这里我们可以读取客户端发送的消息)
    /*
    1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
    2. Object msg: 就是客户端发送的数据 默认Object
     */
    @Override
    public void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {

        //根据dataType 来显示不同的信息

        MyDataInfo.MyMessage.DataType dataType = msg.getDataType();
        if(dataType == MyDataInfo.MyMessage.DataType.StudentType) {

            MyDataInfo.Student student = msg.getStudent();
            System.out.println("学生id=" + student.getId() + " 学生名字=" + student.getName());

        } else if(dataType == MyDataInfo.MyMessage.DataType.WorkerType) {
            MyDataInfo.Worker worker = msg.getWorker();
            System.out.println("工人的名字=" + worker.getName() + " 年龄=" + worker.getAge());
        } else {
            System.out.println("传输的类型不正确");
        }
    }

推荐阅读