首页 > 技术文章 > Netty笔记(3) - 核心组件

xjwhaha 2020-08-24 16:28 原文

各组件关系示意图:

Bootstrap 和 ServerBootstrap

说明: Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类

常见方法:

  • public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup),该方法用于服务器端,用来设置两个 EventLoop

  • public B group(EventLoopGroup group) ,该方法用于客户端,用来设置一个 EventLoop

  • public B channel(Class<? extends C> channelClass),该方法用来设置一个服务器端的通道实现

  • public B option(ChannelOption option, T value),用来给 ServerChannel 添加配置

  • public ServerBootstrap childOption(ChannelOption childOption, T value),用来给接收到的通道添加配置

  • public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务处理类(自定义的 handler)

  • public ChannelFuture bind(int inetPort) ,该方法用于服务器端,用来设置占用的端口号

  • public ChannelFuture connect(String inetHost, int inetPort) ,该方法用于客户端,用来连接服务器端

EventLoopGroup 和其实现类 NioEventLoopGroup

  1. EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般会有多个 EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。
  2. EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop来处理任务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。
  3. 通常一个服务端口即一个 ServerSocketChannel对应一个Selector 和一个EventLoop线程。BossEventLoop 负责接收客户端的连接并将 SocketChannel 交给 WorkerEventLoopGroup 来进行 IO 处理,如下图所示

  • BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop 维护着一个注册了ServerSocketChannel 的 Selector 实例BossEventLoop 不断轮询 Selector 将连接事件分离出来
  • 通常是 OP_ACCEPT 事件,然后将接收到的 SocketChannel 交给 WorkerEventLoopGroup
  • WorkerEventLoopGroup 会由 next 选择其中一个 EventLoop来将这个 SocketChannel 注册到其维护的 Selector 并对其后续的 IO 事件进行处理

ChannelOption

Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数

常用参数:

  • ChannelOption.SO_BACKLOG 用来初始化服务器可连接队列大小。
  • ChannelOption.SO_KEEPALIVE 一直保持连接活动状态

Future、ChannelFuture

说明: Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件

常见方法:

  • Channel channel(),返回当前正在进行 IO 操作的通道

  • ChannelFuture sync(),等待异步操作执行完毕

Channel

说明: Netty 网络通信的组件,能够用于执行网络 I/O 操作。客户端和服务端连接建立的通道抽象类, (类似java原生的Socket),读写数据都是操作此对象

常见Channel子类:

    1. NioSocketChannel,异步的客户端 TCP Socket 连接。
    2. NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
    3. NioDatagramChannel,异步的 UDP 连接。
    4. NioSctpChannel,异步的客户端 Sctp 连接。
    5. NioSctpServerChannel,异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。

Selector

java NIO的 事件监听器,通过 Selector,当建立连接,创建Channel后,就注册进一个Selector对象, 一个Selector线程可以监听多个连接的 Channel中发生的事件。Selector 内部的机制就可以自动不断地查询这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel

ChannelHandler 及其实现类

  1. ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 业务处理链 (PipeLine)中的下一个Handler。

  2. ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类

  3. ChannelHandler 及其实现类一览图(后)

  1. 我们经常需要自定义一个 Handler 类去继承 ChannelInboundHandlerAdapter,然后通过重写相应方法实现业务逻辑

    public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
    
       
        @Skip
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelRegistered();
        }
    
       
        @Skip
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelUnregistered();
        }
    
       //通道第一个激活
        @Skip
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();
        }
    
        //通道销毁
        @Skip
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelInactive();
        }
    
       //读取通道数据
        @Skip
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ctx.fireChannelRead(msg);
        }
    
       //数据读取完毕事件
        @Skip
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelReadComplete();
        }
    
       
        @Skip
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            ctx.fireUserEventTriggered(evt);
        }
    
        
        @Skip
        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelWritabilityChanged();
        }
    
       //发生异常
        @Skip
        @Override
        @SuppressWarnings("deprecation")
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            ctx.fireExceptionCaught(cause);
        }
    }
    
    

ChannelHandlerContext

  1. 保存 Channel 相关的所有上下文信息,同时包装了 ChannelHandler 对象,方便ChannelHandler 链的调用(例如他有上一个 下一个Handler的引用)

  2. 即ChannelHandlerContext 中 包 含 一 个 具 体 的 事 件 处 理 器 ChannelHandler , 同 时ChannelHandlerContext 中也绑定了对应的 pipeline 和 Channel 的信息,方便ChannelHandler进行调用.

PipeLine

入站: 以客户端为例,客户端读取服务端发来的信息,读取Chanel中的数据,

出站: 则为向服务端发送信息,向Chanel中写数据

说明: ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 inbound(入站) 或者 outbound (出站)的事件和操作,相当于一个贯穿 Netty 的链。(也可以这样理解:ChannelPipeline 是 保存 (ChannelHandlerContext对象,而ChannelHandlerContext 包含ChannelHandler,pipe类并没有直接接手Handler的 List) ,用于处理或拦截 Channel 的入站事件和出站操作)

  1. 当一个Chanle通道创建时 都会创建一个 pipeline与之关联,而pipeline中则包含了 各种业务处理类 例如 编码解码,自定义Handler,这些Handler 会在chanel不同的状态中 按情况执行(入站,出站)

  2. ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互

  3. 在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下

对上图的说明:

  • 如图所示, Handler从左至右的顺序 是由注册的先后顺序执行的

    当发生入站事件时, Handler从左至右依次执行相关的 inboundHandler,出站时则从右至左执行outboundHandler

    自己画了一个草图:

  • 一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler

  • 入站事件和出站事件在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 handler,两种类型的 handler 互不干扰

常用方法:

  • ChannelPipeline addFirst(ChannelHandler... handlers),把一个业务处理类(handler)添加到链中的第一个位置
  • ChannelPipeline addLast(ChannelHandler... handlers),把一个业务处理类(handler)添加到链中的最后一个位置

Unpooled 类

Netty 提供一个专门用来操作缓冲区(即Netty的数据容器ByteBuf)的工具类

常见操作演示:

public static void main(String[] args) {

    //创建一个ByteBuf
    //说明
    //1. 创建 对象,该对象包含一个数组arr , 是一个byte[10]
    //2. 在netty 的buffer中,不需要使用flip 进行反转
    //   底层维护了 readerindex 和 writerIndex
    //3. 通过 readerindex 和  writerIndex 和  capacity, 将buffer分成三个区域
    // 0---readerindex 已经读取的区域
    // readerindex---writerIndex , 可读的区域
    // writerIndex -- capacity, 可写的区域
    ByteBuf byteBuf = Unpooled.copiedBuffer("hello,world!", Charset.forName("utf-8"));

    //使用相关的方法
    if(byteBuf.hasArray()) { // true

        byte[] content = byteBuf.array();

        //将 content 转成字符串
        System.out.println(new String(content, Charset.forName("utf-8")));

        System.out.println("byteBuf=" + byteBuf);

        System.out.println(byteBuf.arrayOffset()); // 0
        System.out.println(byteBuf.readerIndex()); // 0
        System.out.println(byteBuf.writerIndex()); // 12
        System.out.println(byteBuf.capacity()); // 36

        //System.out.println(byteBuf.readByte()); //
        System.out.println(byteBuf.getByte(0)); // 104

        int len = byteBuf.readableBytes(); //可读的字节数  12
        System.out.println("len=" + len);

        //使用for取出各个字节
        for(int i = 0; i < len; i++) {
            System.out.println((char) byteBuf.getByte(i));
        }
        //按照某个范围读取
        System.out.println(byteBuf.getCharSequence(0, 4, Charset.forName("utf-8")));
        System.out.println(byteBuf.getCharSequence(4, 6, Charset.forName("utf-8")));
    }
}

总结: 对Netty的核心组件进行基本的介绍,再回过头看笔记2中的案例 是不是清楚了一点

推荐阅读