首页 > 技术文章 > Java NIO的工作方式

wcyBlog 2015-08-09 23:03 原文

1、BIO带来的挑战

  BIO即阻塞IO,不管是磁盘IO,还是网络IO,数据在写入OutputStream或者从InputStream读取时都有可能发生阻塞,一旦有阻塞,当前线程将会被挂起,即线程进入非可执行状态,在这个状态下,CPU不会给线程分配时间片,线程将会失去CPU的使用权,即线程暂停运行,这在当前的大规模访问量和有性能要求的情况下是不能被接受的。虽然当前的网络I/O有一些解决办法,如一个客户端一个处理线程,出现阻塞时只是一个线程阻塞而不会影响其他线程工作,还有为了减少系统线程的开销  ,采用线程池的办法来减少线程创建和回收的成本。如下例子:

Socket的Server端

 

public class Server {
 
   public static void main(String args[]) throws IOException {
      //为了简单起见,所有的异常信息都往外抛
      int port = 8899;
      //定义一个ServerSocket监听在端口8899上
      ServerSocket server = new ServerSocket(port);
      while (true) {
         //server尝试接收其他Socket的连接请求,server的accept方法是阻塞式的
         Socket socket = server.accept();
         //每接收到一个Socket就建立一个新的线程来处理它(这里可以使用pool线程池)
         new Thread(new Task(socket)).start();
      }
   }
   
   /**
    * 用来处理Socket请求的
    */
   static class Task implements Runnable {
      private Socket socket;
      public Task(Socket socket) {
         this.socket = socket;
      }
      public void run() {
         try {
            handleSocket();
         } catch (Exception e) {
            e.printStackTrace();
         }
      }

      /**
       * 跟客户端Socket进行通信
      * @throws Exception
       */
      private void handleSocket() throws Exception {
         BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream(), "GBK"));
         StringBuilder sb = new StringBuilder();
         String temp;
         int index;
         while ((temp=br.readLine()) != null) {
            System.out.println(temp);
            if ((index = temp.indexOf("eof")) != -1) {//遇到eof时就结束接收
             sb.append(temp.substring(0, index));
                break;
            }
            sb.append(temp);
         }
         System.out.println("客户端: " + sb);
         //读完后写一句
       Writer writer = new OutputStreamWriter(socket.getOutputStream(), "UTF-8");
         writer.write("你好,客户端。");
         writer.write("eof\n");
         writer.flush();
         writer.close();
         br.close();
         socket.close();
      }
   }
}

 client客户端

public class Client {
 
   public static void main(String args[]) throws Exception {
      //为了简单起见,所有的异常都直接往外抛
     String host = "127.0.0.1";  //要连接的服务端IP地址
     int port = 8899;   //要连接的服务端对应的监听端口
     //与服务端建立连接
     Socket client = new Socket(host, port);
      //建立连接后就可以往服务端写数据了
     Writer writer = new OutputStreamWriter(client.getOutputStream(), "GBK");
      writer.write("你好,服务端。");
      writer.write("eof\n");
      writer.flush();
      //写完以后进行读操作
     BufferedReader br = new BufferedReader(new InputStreamReader(client.getInputStream(), "UTF-8"));
      //设置超时间为10秒
     client.setSoTimeout(10*1000);
      StringBuffer sb = new StringBuffer();
      String temp;
      int index;
      try {
         while ((temp=br.readLine()) != null) {
            if ((index = temp.indexOf("eof")) != -1) {
                sb.append(temp.substring(0, index));
                break;
            }
            sb.append(temp);
         }
      } catch (SocketTimeoutException e) {
         System.out.println("数据读取超时。");
      }
      System.out.println("服务端: " + sb);
      writer.close();
      br.close();
      client.close();
   }
}

  在以上代码中,你会发现在读取写入的流中会有”eof”的结束标记符,在服务端同时读写时,读取客户端发送的数据完毕后,while循环没有跳出来,服务端也将无法向客户端返回数据,所以在while的判断中加上一个条件,当遇到自定义的结束符时,将跳出循环,继续执行下面代码。

  但是有一些使用场景下仍然是无法解决的,如当前一些需要大量HTTP长连接的情况;这里说一下什么是长连接: HTTP1.1规定了默认保持长连(HTTP persistent connection ,也有翻译为持久连接),数据传输完成了保持TCP连接不断开(不发RST(RST包用于强制关闭TCP链接)、不四次握手),等待在同域名下继续用这个通道(Channel)传输数据;相反的就是短连接。像淘宝现在使用的Web旺旺,服务端需要同时保持几百万的HTTP连接,但并不是每时每刻这些 连接都需要传输数据,这种情况下不可能同时创建这么多线程来保持连接。即使线程的数量不是问题,仍然有一些问题是无法避免的,比如我们想给某些客户端更高的服务优先级,很难通过设计线程的优先级来完成。另外一种情况是,每个客户端的请求在服务端可能需要访问一些竞争资源,这些客户端在不同线程中,因此需要同步,要实现这种同步操作远比用单线程复杂得多。以上这些情况都说明,我们需要另外一种新的I/O操作方式。

2、NIO的工作机制

  我们先看一下NIO相关类图

  图中有 两个关键类:Channelselector,他们是NIO的两个核心概念。我们有城市交通工具来比喻NIO的工作方式,这里的Channel哟啊比Socket更加具体,它可以比作某种具体的交通工具,如汽车或高铁,而Selector可以比作一个车站的车辆运行调度系统,它将负责监控每辆车的当前运行状态,是已经出站,还是在路上等。也就是它可以轮询每个Channel状态。这里还有一个buffer类 ,它也比Stream更加具体,我们可以将它比作车上的车位。Channel是汽车的话Buffer就是汽车上的座位,它始终是一个具体的概念,与Stream不同,Stream只能代表一个座位 ,至于是什么座位有你自己去想象,也就是你在上车之前并不知道这个车上是否还有没有座位,也不知道上的额是什么车 ,因为你并不能选择。而这些信息都已经被封装在了运输工具(Socket)里面(Socket里面自带了一个IO Stream),对你是透明的(你是看不见的,你并不知道它的大小,工作方式)

  NIO引入了ChannelBufferSelector就是想把这些信息具体化,让程序员能够有机会控制它们。例如,当我呢吧调用write()SendQ中写数据时,当一次写的数据超过SendQ长度时需要按照SendQ的长度进行分割,这个过程中需要将用户空间数据和内核地址空间进行切换,而这个切换不是你可以控制的,但在Buffer中我们可以控制Buffer的容量、是否扩容以及如何扩容。

  这里不得不说一下SendQRecvQ的两个参数了

  RecvQ:表示网络接收队列。表示收到的数据已经在本地接收缓冲,但是还有部分没有被进程取走,就缓冲在队列,知道进程有空闲取走为止。一般缓冲在队列的时间不会很长。如果接收队列RecvQ一直处在阻塞状态,可能遭受了拒绝服务denial-of-service攻击。

  SenQ:表示网络发送队列。对方没有收到的数据或者没有Ack(正确应答信号)的,IO流还是在本地缓冲区。如果发送队列SendQ不能很快清零,可能是有应用向外发送数据包过快,或者对方接收数据包不够快。

这两个值通常应该为0的,如果不为0可能是有问题的。Packets包在两个队列里都不应该有堆积状态。可接受短暂的非零情况。

  理解了这些概念后,我们看一下实际上它们是如何工作的,下面是一段典型的NIO代码:

public void selector() throws IOException{
		ByteBuffer buffer = ByteBuffer.allocate(1024);
		Selector selector = Selector.open();
		ServerSocketChannel ssc = ServerSocketChannel.open();
		ssc.configureBlocking(false);//非阻塞
		ssc.socket().bind(new InetSocketAddress(8080));
		ssc.register(selector, SelectionKey.OP_ACCEPT);//注册监听事件
		while(true){
			Set selectedkeys = selector.selectedKeys();//取得所有key的集合
			Iterator it = selectedkeys.iterator();//set集合迭代出key
			while(it.hasNext()){//判断it的是否还有下一个,没有返回false
				SelectionKey key = (SelectionKey) it.next();
				if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) { 
					ServerSocketChannel serverChanel = (ServerSocketChannel)key.channel();
					SocketChannel sc = serverChanel.accept();
					sc.configureBlocking( false ); 
					SelectionKey newKey = sc.register( selector,SelectionKey.OP_READ);
					it.remove();
				}else if((key.readyOps() & SelectionKey.OP_READ)== SelectionKey.OP_READ){ 
					SocketChannel sc = (SocketChannel)key.channel(); //通过学号知道是谁问的问题 
					int bytesEchoed = 0; 
					while(true){ 
						buffer.clear();
						int n = sc.read(buffer);
						if(n<=0){
							break;
						}
						buffer.flip();//反转缓冲区,这时操作系统就可以正确的读取字节发送出去
					}
					it.remove();
				}
			}
		}
	}

以上代码,就算看着JDK API可能理解起来比较吃力,所以通过某些情景来理解,印象可能会更加深刻。

在用NIO通讯的过程中我用以下情景模拟:

1. 学校(ServerSocketChannel) 

2. 学校教务处(Selector) 

3. 老师 (ServerSocket ) 

4. 学生 (SocketChannel) 

5. 员工号/学生号(SelectionKey

学校:相当于我们的网络应用程序,一旦学校启动,学校就不停止,不断运行,直到学期结束; 

要启动学校就要: 

ServerSocketChannel ssc= ServerSocketChannel.open();//新建NIO通道 
ssc.configureBlocking( false );//使通道为非阻塞 

 老师: 相当于服务端的Socket,一个老师对应多个学生,多个学生向老师请教,老师会一一做出回答。而学校要正常运营当然当不了老师,所以在开学之前,必须先聘请专业的老师来任教 

ServerSocket ss = ssc.socket();//创建基于NIO通道的socket连接 
//新建socket通道的端口 
ss.bind(new InetSocketAddress("127.0.0.1",SERVERPORT)); 

 学校教务处: 老师都有了,但是需要有部门对老师和学生做统一的管理, 如果你去一个学校找一个人,实在是找不到,你可以告诉教务处,那个人是学生还是老师,是老师的话员工编号老师姓名的多少,是学生的话学号和姓名是多少,教务处就会找到告诉你他在哪里。

//将NIO通道选绑定到择器,当然绑定后分配的主键为skey 
SelectionKey skey = ssc.register( selector, SelectionKey.OP_ACCEPT ); 

ssc注册了选择器后,其下的老师ServerSocket就也入了员工册了。所以老师的编号就是skey 

学生: 学校、老师、教务处都有了,现在就可以招生了! 

如果有学生来报名: 

while(true){//除非学期结束,否则一直等待学生 
int num = selector.select();//获取通道内是否有选择器的关心事件, 意思是有多少学生报告(注:select()方法:选择一组键,其相应的通道以为I/O操作准备就绪。返回值可能为0) 
if(num<1){continue; } //当select()方法返回值>1,说明I/O操作还没准备就绪,将阻塞。小于1,往下继续执行。
Set selectedKeys = selector.selectedKeys();//获取通道内关心事件的集合 ,这里的集合就是老师和学生的编号集合,如果key是学生的,那就是老学生来问问题,如果key是老师的,那就是招生办的老师带着一个新生来注册 
Iterator it = selectedKeys.iterator(); 
while (it.hasNext()) {//遍历每个key (学生key和老师key) 
....... 
} 
..... 
} 

既然有学生来报告,那有两种可能,一种是招生老师带着新生来注册的,一种是老生来问问题的。 

上面的while (it.hasNext()) 体可以这样写: 

while (it.hasNext()) {//遍历每个事件 
try{ 
SelectionKey key = (SelectionKey)it.next(); //先得到这个学生的编号key 
//判断是新生报道还是老生问问题 
if ((key.readyOps() & SelectionKey.OP_ACCEPT) 
== SelectionKey.OP_ACCEPT) { 
//这是招生老师的Key说明是新生注册,先找到招生老师,再由招生老师找到新生,就可以给新生注册学号了 
ServerSocketChannel serverChanel = (ServerSocketChannel)key.channel(); //通过key把学校和老师找到了 
//从serverSocketChannel中创建出与客户端的连接socketChannel 有了老师才有学生,不可能我教计算机的,来一个想学李小龙的都让他报名 
SocketChannel sc = serverChanel.accept(); //学生报名成功 
sc.configureBlocking( false ); 
// 把新连接注册到选择器,新生被接收后给注册个新学号 
SelectionKey newKey = sc.register( selector, 
SelectionKey.OP_READ ); //注册学号成功,并分配学生的权限 
it.remove(); //新生注册任务完成了,呵呵 
System.out.println( "Got connection from "+sc ); 
}else 
//读客户端数据的事件,此时有客户端发数据过来,客户端事件 这是老学生来问问题了。 
if((key.readyOps() & SelectionKey.OP_READ)== SelectionKey.OP_READ){ 
// 读取数据 ,接受学生的问题 
SocketChannel sc = (SocketChannel)key.channel(); //通过学号知道是谁问的问题 

//下面接受问题 
int bytesEchoed = 0; 
while((bytesEchoed = sc.read(buffer))> 0){ 
System.out.println("bytesEchoed:"+bytesEchoed); 
} 
echoBuffer.flip(); 
System.out.println("limet:"+echoBuffer.limit()); 
byte [] content = new byte[echoBuffer.limit()]; 
echoBuffer.get(content); 
String result=new String(content); 
doPost(result,sc); //相应老师会去做回答的,细节自己去写吧 
echoBuffer.clear(); 
it.remove(); //任务完成,记得上面也是一样,要remove掉,否则下一次又来一次任务,就死循环了 
} 
}catch(Exception e){} 
} 
}

补充:

ssc.register( selector, SelectionKey.OP_ACCEPT ); 

这个方法是把ssc注册绑定到选择器selector 这样下次你想找ssc或者判断一个对象是不是ssc就可以通过selector来查找,查找是通过判断ssckey得到的。

至于第二个参数SelectionKey.OP_ACCEPT 你可以理解成ssckey类型或者操作权限

如果 ssc是学校老师,那么绑定成功后 老师就拥有了OP_ACCEPT的权限或者说他的key类型是SelectionKey.OP_ACCEPT 

Accept是接受的意思,这是不是很像socket编程里的 accept()方法呢? 是的,没错,我们正是通过这个参数给了老师招生和带学生来注册的权限。

而学生呢? 

他拥有的权限为SelectionKey.OP_READ 表示有收发读取消息的权限,即问问题的权限,因此他不能帮别的学生注册。

所以你回到上面仔细看看while结构体里面做了判断如下:

if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {...} 很明显,拥有Accept权限的人只可能是老师,那老师有什么事会找教务处? 那肯定就是他是招生办的,招到一个学生来报名来注册了。

然后,马上给这个新连上来的客户端分配了一个key

SelectionKey newKey = sc.register( selector, SelectionKey.OP_READ ); 看,这里只给他OP_READ,而不是Accept

另一个if 

else if((key.readyOps() & SelectionKey.OP_READ)== SelectionKey.OP_READ){ 

//很明显,这是这学生,因为所有带OP_READ的人都是前面由招生办老师带过来注册过的。

    总结一下:调用Selector的静态工厂创建一个选择器.open();创建一个服务端的ChannelServerSocketChannel.open();并把这个通信信道注册到选择器上ssc.register(selector, SelectionKey.OP_ACCEPT);,把这个通信信道设置为非阻塞模式。然后就可以调用SelectorselectedKeys()方法来检查已经注册在这个选择器上的所有通信信道是否有需要的事件发生,如果有某个事件发生,将会返回所有的SelectedKey,通过这个对象Channel()方法就可以取得这个通信信道对象,从而可以读取通信的数据,而这里读取的数据是Buffer,这个Buffer是我们可以控制的缓冲器。

  在上面这段程序中,将Server端的监听连接请求的事件和处理请求的事件放在一个线程中,但是在事件应用中,我们通常会把它们放在两个线程中:一个线程专门负责监听客户端的连接请求,而且是以阻塞方式执行的;另外一个线程专门负责处理请求,这个专门处理请求的线程才会真正采用NIO的方式,像web服务器TomcatJetty都使用这个处理方式。

下图描述了基于NIOSocket请求的处理过程。

  其中的Selector可以同时监听一组通信信道Channel上的I/O状态,前提是这个Selector已经注册到这些信道中。选择器Selector可以调用select()方法检查已经注册的通信信道是哪个的I/O是否已经准备好,如果没有至少一组信道I/O状态有改变,那么select()方法会阻塞等待或在超时时间后返回0.如果有多个信道有数据,那么将会把这些数据分配到对应的数据Buffer中。所以关键的地方是,有一个线程来处理所有连接的数据交互,每个连接的数据交互都不是阻塞方式,所以可以同时处理大量的连接请求。

 

推荐阅读