NIO基础-JDK
前言
Java NIO(New Input/Output)是Java1.4引入的一种IO操作方式,相对于Java IO(BIO),它提供了更高效的操作IO的方式。
Java IO是面向流的方式,而且是单向的;而Java NIO是面向块的,借助通道可双向传输。
Java IO在调用read或write方式时,会阻塞;Java NIO在调用read或write时如果缓冲区没有数据可读或已经写满时,不会阻塞(即IO操作不阻塞)。
文章主要偏向于介绍网络操作的NIO。
NIO核心组件
Java NIO涉及一些核心的概念,包括通道(Channel)、缓冲区(Buffer)、选择器(Selector)。
对应的实现在java.nio包下面
通道(Channel)
通道可以理解成IO操作的一个纽带,通过一个打开的连接链到一个实体,这个实体可以是软件磁盘、文件、网络Socket、执行IO读写操作的程序组件。
通道是双向的,通常也是线程安全的,常见的Channel有:
FileChannel:从文件中读取数据;
DatagramChannel:通过UDP读取网络中的数据;
SocketChannel:通过TCP读取网络中的数据;
ServerSocketChannel:监听新进入的连接,为每个连接创建SocketChannel。
关于SocketChannel,可以同时读和写操作,但是任何时候只有1个线程在读和1个线程在写。可以这么理解,1个线程在读的时候,另一个线程需要等待该通道的读操作;写类似。
核心api:
- open():创建一个尚未被连接的SocketChannel;
- connect():连接到一个Socket地址;
- finishConnect():完成连接,与connect相互同步,意思是调用finishConnect()会阻塞到connect()完成为止;read/write方法也与这两个连接方法同步互斥,确保数据安全。
缓冲区(Buffer)
缓冲区,应用程序发给通道Channel的所有数据需要放到Buffer中,从通道中读取数据需要先读到Buffer中。Buffer是线程不安全的。
Buffer的作用如下图所示:
缓冲区其实是一个数组,这个数组可以是内存数组,也可以是直接内存数组(堆外内存)。默认使用allocate()方法分配的是HeapByteBuffer。
缓冲区定义了7种原始数据类型的Buffer类,它们都是Abstract类型,如ByteBuffer、LongBuffer、BooleanBuffer等。其中Byte是1个字节长度,数据类型中最小的单元,ByteBuffer类中定了如获取int、long长度数据的方法。
缓冲区的状态变量包括:mark、position、limit、capacity
(mark <= position <= limit <= capacity)
- mark:标记,备忘位置,调用mark方法可以使其等于position,记录当前的position的位置。(默认-1)
- position:当前已经读写的字节数。(默认0)
- limit:最大可读写的字节数。(默认等于capacity)
- capacity:容量大小,不可变。
假设有一个容量8个字节的缓冲区:
1、如果是写入Buffer:
a、初始值:position=0(指向第一个字节);limit=capacity=8;
此时position表示已经写的字节数,limit表示可写的字节数
b、然后写入2个字节:position=2(指向第三个字节),limit=capacity=8;
2、如果是读取Buffer:
c、基于1,调用flip()翻转Buffer信息,此时,Buffer变成可读取。此时:position=0(指向第一个字节);limit=2(指向第三个字节),capacity=8;
此时position表示已经读的字节数,limit表示可读的字节数
d、然后读取1个字节后,position=1(指向第二个字节);limit=2(指向第三个字节),capacity=8;
过程如下图所示:
选择器(Selector)
NIO实现或提供了IO多路复用Reactor模型的基础能力,一个线程可以用一个选择器Selector通过轮询的方式监听多个通道Channel上的IO事件。
通过配置监听的Channel为非阻塞的,那么到IO事件未到达时,selector不会阻塞在某个Channel上,而是轮询其它的Channel,找到已经到达IO事件的Channel进行处理,但是找的过程对应用程序来说还是阻塞的。
Selector的核心方法select,监听就绪的事件,返回对应SelectionKey个数。找到后通过SelectionKey集合匹配已经就绪的Channel进行处理。select方法的底层实现,取决于操作系统来执行select、poll、epoll等系统调用。
在将通道Channel注册到选择器Selector上的时候,需要注册具体感兴趣的事件,包括以下几类:
- SelectionKey.OP_CONNECT = 1 << 3 = 8
- SelectionKey.OP_ACCEPT = 1 << 4 = 16
- SelectionKey.OP_READ = 1 << 0 = 1
- SelectionKey.OP_WRITE = 1 << 2 = 4
粘合剂(SelectionKey)
在将通道注册到选择器上的时候,会创建SelectionKey实例,并标记感兴趣的事件集合。SelectionKey表示选择器(Seletor)与网络通道(Channel)之间的令牌,也即粘合剂。
SelectionKey的主要作用包括:
- 兴趣集合:通道在注册到选择器时,配置感兴趣的事件,如SelectionKey.OP_READ,可以多个,用 | 拼接即可,最后创建一个SelectionKey实例;
- 准备就绪集合:通道有可操作事件的时候,通过SelectionKey可以跟踪对应事件;
- 附件:SelectionKey允许附加对象,做额外的记录
- 与选择器交互:当选择器的select()方法被调用,选择器就会检查所有注册的通道是否有任何就绪的事件。如果有,选择器将会返回,并且相应的SelectionKey将被添加到选择器的已就绪键集合中,以表示它的通道已经准备好进行某些操作了;
- 事件处理:程序可以通过检查每个就绪的SelectionKey来决定如何响应,如接受新连接、读取数据、写入数据等。
从构造器可以看出来,其组合了channel和selector
// sun.nio.ch.SelectionKeyImpl
// 感兴趣事件,如果是多个用 | 计算所得
private volatile int interestOps;
// 就绪事件
private int readyOps;
SelectionKeyImpl(SelChImpl var1, SelectorImpl var2) {
// 通道
this.channel = var1;
// 选择器
this.selector = var2;
}
类图
Channel、Selector、SelectorProvider 类图如下:
示例
服务端
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
/**
* nio服务端
*/
@Slf4j
public class NIOServer {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
// server的socketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
// 注册selector,对OP_ACCEPT事件感兴趣
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// serverSocket
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress("127.0.0.1", 8888));
while (true) {
// 阻塞获取事件
int select = selector.select();
log.info("select count:{}", select);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
// 连接事件
if (selectionKey.isAcceptable()) {
ServerSocketChannel ssChannel = (ServerSocketChannel)selectionKey.channel();
// 服务器会为每个新连接创建一个 SocketChannel
SocketChannel sChannel = ssChannel.accept();
sChannel.configureBlocking(false);
// 这个新连接主要用于从客户端读取数据,对OP_READ事件感兴趣
sChannel.register(selector, SelectionKey.OP_READ);
log.info("acceptChannel:{} hashCode:{}", sChannel, sChannel.hashCode());
} else if (selectionKey.isReadable()) {
// 可读
SocketChannel sChannel = (SocketChannel) selectionKey.channel();
log.info("readableChannel:{} hashCode:{}", sChannel, sChannel.hashCode());
String msg = readDataFromSocketChannel(sChannel);
log.info("msg:{}", msg);
// 不能close,否则无法再收到数据
// sChannel.close();
}
// 重要,这里移除的是selector内部的集合中的SelectionKey实例,对应已经处理过的io事件。
// 不移除的话,如果此次的SelectionKey出现,与之前的SelectionKey集合不全相同,会出错;但如果是全部相同的SelectionKey再次出现,会判重相等而不会报错
iterator.remove();
}
}
}
/**
* 读取通道的数据
* 正常需要数据解码,不然客户端发送多次,被一次读取完
*
* @param sChannel:socket通道
*/
private static String readDataFromSocketChannel(SocketChannel sChannel) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(1024);
StringBuilder sb = new StringBuilder();
buffer.clear();
int read = sChannel.read(buffer);
if (read <= 0) {
return "";
}
// 读取buffer
buffer.flip();
// 可读大小
int limit = buffer.limit();
char[] dst = new char[limit];
for (int i= 0; i < limit; i++) {
dst[i] = (char)buffer.get(i);
}
sb.append(dst);
buffer.clear();
return sb.toString();
}
}
客户端
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
/**
* 升级版客户端
*/
@Slf4j
public class NIOClient2 {
// 发送缓冲区
private static ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
// 接收缓冲区
private static ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);
public static void main(String[] args) throws IOException {
// 打开选择器
Selector selector = Selector.open();
// 打开Socket通道
SocketChannel socketChannel = SocketChannel.open();
log.info("socketChannel:{} hashCode:{}", socketChannel, socketChannel.hashCode());
// 非阻塞方式
socketChannel.configureBlocking(false);
// 注册连接事件,SelectionKey表示Selector和Channel一个关系,充当粘合剂的作用。对OP_CONNECT事件感兴趣
socketChannel.register(selector, SelectionKey.OP_CONNECT);
// 发起连接 -> 非阻塞
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
while (true) {
// 阻塞到有IO事件
selector.select();
// 已就绪键值,能找到对应的Channel
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isConnectable()) {
// 可连接
System.out.println("client触发连接事件");
SocketChannel clientChannel = (SocketChannel) key.channel();
log.info("socketChannel:{} connect hashCode:{}", socketChannel, socketChannel.hashCode());
// 是否正在进行连接操作
if (clientChannel.isConnectionPending()) {
clientChannel.finishConnect();
System.out.println("client完成连接事件");
}
// 注册读事件,对OP_READ和OP_WRITE事件感兴趣, 不能一直对OP_WRITE事件感兴趣,不然buffer有空间可写时会一直触发写事件
clientChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
// 新线程监听控制台
new Thread(() -> consultInput(clientChannel)).start();
} else if (key.isReadable()) {
// 可读
SocketChannel clientChannel = (SocketChannel) key.channel();
receiveBuffer.clear();
// 读取数据
int count = clientChannel.read(receiveBuffer);
if(count > 0){
String receiveText = new String(receiveBuffer.array(), 0, count);
System.out.println("客户端接受服务器端数据--:" + receiveText);
}
} else if (key.isWritable()) {
// 可写
SocketChannel clientChannel = (SocketChannel) key.channel();
sendBuffer.clear();
String sendText = "write server";
sendBuffer.put(sendText.getBytes(StandardCharsets.UTF_8));
sendBuffer.flip();
clientChannel.write(sendBuffer);
System.out.println("客户端发送服务器端数据--:" + sendText);
clientChannel.register(selector, SelectionKey.OP_READ);
}
}
keys.clear();
}
}
/**
* 监听控制台的输入
*
* @param clientChannel
*/
private static void consultInput(SocketChannel clientChannel) {
// 通过consult获取输入
Scanner in = new Scanner(System.in);
while(true){
if(in.hasNextLine()) {
String line = in.nextLine();
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.clear();
buffer.put(line.getBytes(StandardCharsets.UTF_8));
buffer.flip();
try {
clientChannel.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
结果(先启动服务端,再启动客户端):
客户端部分提示:客户端发送服务器端数据–:write server
服务端部分提示:msg:write server
使用注意:
1、处理完的SelectionKey须移除,防止某些情况重复处理(见服务端代码注释,见iterator.remove());
2、如果同时对多个事件感兴趣,可以这么写:SelectionKey.OP_READ | SelectionKey.OP_WRITE
3、不能一直对OP_WRITE事件感兴趣,不然buffer有空间可写时会一直触发写事件
总结
NIO中核心的几个组件,包括Channel、Buffer、Selector、SelectionKey。他们都在java.nio包下面。
选择器Selector的出现,可以使得1个线程监听多个客户端的通道,感知每个通道上不同的IO事件,并通过SelectionKey来关联到对应的Channle进行处理,无疑是借助了内核的实现。