IO多路复用-Reactor模式
前言
学习了NIO(New Input/Output)的基础后,已经对IO的多路复用有了初步的了解。关于IO多路复用的技术还有哪些值得深入学习的,大家所熟知的Reactor网络编程模式便是使用IO多路复用技术。希望文章可以帮助你对Reactor模式的不同实现有个清晰的认识。
传统IO(阻塞/非阻塞)
先来看看传统的IO模型,对于一个服务器有多个线程,每个线程处理1个客户端请求,包括连接、读取数据、业务处理、发送数据整个过程。
可以看出服务能处理的并发数,很大程度受线程数的影响。
说明:
- 简单, 并发量不大时没有问题。
- 并发量大时,比较依赖于服务器的线程数;
- 线程多了,线程创建和线程调度等会消耗更多的CPU和内存;
- read和send操作属于IO操作,与process串行处理,相当于它们是相互阻塞的,降低响应速度;
IO多路复用-Reactor模式
基于传统的IO,是否可以以更低的资源来处理更多的请求。JDK1.4提供了一套非阻塞IO的API(API相关内容可以看另一篇文章,这里主要说网络IO),本质上是借助操作系统select、poll、epoll这些内核的东西实现了一套以事件驱动的IO多路复用的网络IO操作。
Reactor(反应堆)就是基于这套实现而提出的以事件驱动的IO模型。
这里面有两个概念:IO多路复用 和 Reactor。
Reactor已经说了,字面意思是反应堆,对事件的“反应”或“响应”,是一个抽象概念定义。
IO多路复用是五种IO模型的一种,可以看之前的文章,简单理解就是通过一个或几个线程监听一批网络连接的IO事件(包括连接、读、写)。
Reactor模式由Reactor和资源处理两部分组成:
- Reactor:负责事件监听与分发,包括连接、读写事件。
- 资源处理:负责处理事件,如数据读取、业务处理、数据发送。
常见的Reactor模式:
- 单Reactor单线程
- 单Reactor多线程
- 多Reactor多线程
线程可以换成进程,介绍主要以线程为主
单Reactor单线程
单Reactor单线程模式如下:
在该模式下,包括三个角色:Reactor、Acceptor、Handler
- Reactor:监听和分发所有的IO事件。
- Acceptor:处理连接事件。
- Handler:处理读写事件和业务逻辑。
其中的accept、read、send属于系统调用函数,dispatch和process分别是事件分发和业务逻辑处理。
其大致过程:
1、Reactor通过IO多路复用接口(如Selector的select函数)监听事件,收到事件后进行分发;
2、如果是连接事件交由Acceptor,Acceptor通过accept获取连接,并创建Handler进行后续读写事件的处理;
3、如果是其它事件交由对应的Handler处理,进行读取、业务处理、写出等整个过程。
该方案解决了传统IO线程资源问题,且以事件通知方式驱动。但是也有其缺点:
1、在进行Acceptor或者Handler时,无法同时处理其它连接,会造成并发下响应延迟;
2、单个线程处理无法充分利用CPU的多核优势。
但是对于能够快速进行业务处理的场景,如Redis这种内存数据库,6.0版本前,使用这种模式,也能很快,瓶颈不在CPU上,好处是没有资源的竞争和上下文切换。
单Reactor多线程
基于单Reactor单线程,优化出单Reactor多线程模式,如下:
如图,将IO事件与业务处理分离,在处理IO事件的同时可以进行业务逻辑处理,充分利用CPU资源。
但连接和读写IO事件仍在一个线程中处理,面对高并发,仍为成为性能瓶颈。
多Reactor多线程
单Reactor变多Reactor,将Reactor拆分成两种(不一定是两个),MainReactor和SubReactor:
MainReactor:主线程的MainReactor使用IO多路复用选择器监听IO连接事件,将其交由Acceptor处理,Acceptor将连接交由某个子线程;
SubReactor:子线程中的SubReactor对新连接进行监听读写事件,并创建Handler处理后续的事件。
如下图所示:
说明:
1、这里的SubReactor也是由线程池处理或几个固定子线程,业务处理process在另一个线程池中处理。
2、注意SubReactor的线程数并不等于请求数,1个SubReactor可以监听多个SockctChannel的网络IO事件。
3、多Reactor多线程看起来复杂,其实相对简单,体现在 1)、两种Reactor分工明细;2)、交互简单,主Reactor只需要新连接给子线程即可,剩下的交给子线程跟进。
被大家所熟知的Netty和Memcache都是采用此模式。
示例
服务端
import cn.hutool.core.thread.NamedThreadFactory;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.*;
/**
* 多Reactor多线程模式。
*
* 主Reactor负责处理连接
* 从Reactor负责处理读写
*/
@Slf4j
public class MultiReactor {
// 3个线程,1个主Reactor、2个从Reactor
private static final int THREAD_POLL_SIZE = 3;
static ExecutorService reactorThreadPool = new ThreadPoolExecutor(THREAD_POLL_SIZE, THREAD_POLL_SIZE,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory("ReactorThreadPoll-", false));
int next = 0;
// 主Reactor负责处理连接
private Reactor mainReactor;
// 从Reactor负责处理读写
private Reactor[] subReactors = new Reactor[THREAD_POLL_SIZE - 1];
// 监听端口
private int port;
public static void main(String[] args) throws IOException {
MultiReactor multiReactor = new MultiReactor(8888);
multiReactor.start();
}
/**
* 分配线程
*/
public MultiReactor(int port) throws IOException {
this.port = port;
mainReactor = new Reactor("mainReactor");
for (int i = 0; i < subReactors.length; i++) {
subReactors[i] = new Reactor("subReactor" + i);
}
}
/**
* 启动
*/
private void start() throws IOException {
// 主Reactor
Thread mrThread = new Thread(mainReactor);
// 创建ServerSocketChannel,注册selector,关注OP_ACCEPT事件
new Acceptor(mainReactor.getSelector(), port);
// reactor开始监听
reactorThreadPool.execute(mrThread);
// 从Reactor
for (int i = 0; i < subReactors.length; i++) {
Thread srThread = new Thread(subReactors[i]);
reactorThreadPool.execute(srThread);
}
}
/**
* Reactor抽象,包括selector监听事件
*/
static class Reactor implements Runnable {
// 队列,主Reactor处理接收事件时,要在从Reactor注册读写事件,但是从Reactor当时正阻塞在 select() 方法上,
// 所以借助队列,在达到先在select唤醒,及时注册,再select
private ConcurrentLinkedQueue<SocketChannel> events = new ConcurrentLinkedQueue<>();
// 用于识别当前reactor
private String name;
final Selector selector;
public Reactor(String name) throws IOException {
this.name = name;
selector = Selector.open();
}
public Selector getSelector() {
return selector;
}
@Override
public void run() {
// normally in a new Thread
// 处理IO事件
try {
// 死循环
while (!Thread.interrupted()) {
// 针对从Reactor,从select唤醒后,先尝试Register,不然内部有锁竞争
SocketChannel sChannel;
while ((sChannel = events.poll()) != null) {
// 初始化读写事件
new Handler(selector, sChannel);
}
// 阻塞,直到有通道事件就绪
log.info("{} 进入select监听", name);
selector.select();
// 拿到就绪通道 SelectionKey 的集合
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
SelectionKey skTmp = it.next();
// 根据 key 的事件类型进行分发
dispatch(skTmp);
}
// 清空就绪通道的 key
selected.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 分发
*/
void dispatch(SelectionKey key) {
Runnable r = (Runnable) (key.attachment()); // 拿到通道注册时附加的对象
if (r != null) r.run();
}
/**
* 注册IO事件
*/
void register(SocketChannel socketChannel) throws ClosedChannelException {
events.offer(socketChannel);
selector.wakeup();
}
}
/**
* 处理连接类
*/
class Acceptor implements Runnable {
final ServerSocketChannel serverSocketChannel;
/**
* 初始化通道,注册选择器,关注连接事件
*/
public Acceptor(Selector selector, int port) throws IOException {
serverSocketChannel = ServerSocketChannel.open();
// bind
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(port));
// 非阻塞
serverSocketChannel.configureBlocking(false);
// 注册到选择器,关注连接器事件
SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
key.attach(this);
log.info("Acceptor- mainReactor- new Acceptor listen port:{}", port);
}
@Override
public void run() {
// 处理连接事件
try {
log.info("Acceptor- 处理连接");
SocketChannel sChannel = serverSocketChannel.accept();
sChannel.configureBlocking(false);
// 这个新连接主要用于从客户端读取数据,对OP_READ事件感兴趣
Reactor subReactor = subReactors[next];
subReactor.register(sChannel);
if(++next == subReactors.length){
next = 0;
}
log.info("Acceptor- 处理连接完成");
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 业务处理类
*/
static class Handler implements Runnable {
final SelectionKey selectionKey;
// 可以加线程池处理
public Handler(Selector selector, SocketChannel socketChannel) throws ClosedChannelException {
SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
// 管理事件的处理程序
key.attach(this);
this.selectionKey = key;
}
@Override
public void run() {
// 处理读写事件
SocketChannel sChannel = (SocketChannel) selectionKey.channel();
try {
String msg = readDataFromSocketChannel(sChannel);
log.info("Handler- msg:{}", msg);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 数据读取
* @param sChannel
* @return
* @throws IOException
*/
private String readDataFromSocketChannel(SocketChannel sChannel) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(512);
StringBuilder sb = new StringBuilder();
buffer.clear();
int read = sChannel.read(buffer);
if (read <= 0) {
return "";
}
// 读取buffer
buffer.flip();
// 可读大小
int limit = buffer.limit();
char[] dst = new char[limit];
for (int i= 0; i < limit; i++) {
dst[i] = (char)buffer.get(i);
}
sb.append(dst);
buffer.clear();
return sb.toString();
}
}
}
客户端
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
/**
* 升级版客户端
*/
@Slf4j
public class NIOClient2 {
// 发送缓冲区
private static ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
// 接收缓冲区
private static ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);
public static void main(String[] args) throws IOException {
// 打开选择器
Selector selector = Selector.open();
// 打开Socket通道
SocketChannel socketChannel = SocketChannel.open();
log.info("socketChannel:{} hashCode:{}", socketChannel, socketChannel.hashCode());
// 非阻塞方式
socketChannel.configureBlocking(false);
// 注册连接事件,SelectionKey表示Selector和Channel一个关系,充当粘合剂的作用。对OP_CONNECT事件感兴趣
socketChannel.register(selector, SelectionKey.OP_CONNECT);
// 发起连接 -> 非阻塞
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
while (true) {
// 阻塞到有IO事件
selector.select();
// 已就绪键值,能找到对应的Channel
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isConnectable()) {
// 可连接
System.out.println("client触发连接事件");
SocketChannel clientChannel = (SocketChannel) key.channel();
log.info("socketChannel:{} connect hashCode:{}", socketChannel, socketChannel.hashCode());
// 是否正在进行连接操作
if (clientChannel.isConnectionPending()) {
clientChannel.finishConnect();
System.out.println("client完成连接事件");
}
// 注册读事件,对OP_READ和OP_WRITE事件感兴趣, 不能一直对OP_WRITE事件感兴趣,不然buffer有空间可写时会一直触发写事件
clientChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
// 新线程监听控制台
new Thread(() -> consultInput(clientChannel)).start();
} else if (key.isReadable()) {
// 可读
SocketChannel clientChannel = (SocketChannel) key.channel();
receiveBuffer.clear();
// 读取数据
int count = clientChannel.read(receiveBuffer);
if(count > 0){
String receiveText = new String(receiveBuffer.array(), 0, count);
System.out.println("客户端接受服务器端数据--:" + receiveText);
}
} else if (key.isWritable()) {
// 可写
SocketChannel clientChannel = (SocketChannel) key.channel();
sendBuffer.clear();
String sendText = "write server";
sendBuffer.put(sendText.getBytes(StandardCharsets.UTF_8));
sendBuffer.flip();
clientChannel.write(sendBuffer);
System.out.println("客户端发送服务器端数据--:" + sendText);
clientChannel.register(selector, SelectionKey.OP_READ);
}
}
keys.clear();
}
}
/**
* 监听控制台的输入
*
* @param clientChannel
*/
private static void consultInput(SocketChannel clientChannel) {
// 通过consult获取输入
Scanner in = new Scanner(System.in);
while(true){
if(in.hasNextLine()) {
String line = in.nextLine();
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.clear();
buffer.put(line.getBytes(StandardCharsets.UTF_8));
buffer.flip();
try {
clientChannel.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
先启动服务端,再启动客户端,输出如下:
服务端:
Acceptor- mainReactor- new Acceptor listen port:8888
subReactor1 进入select监听
mainReactor 进入select监听
Acceptor- 处理连接
Acceptor- 处理连接完成
mainReactor 进入select监听
subReactor0 进入select监听
Handler- msg:write server
subReactor0 进入select监听
客户端:
socketChannel:java.nio.channels.SocketChannel[connection-pending remote=/127.0.0.1:8888] connect hashCode:353842779
client完成连接事件
客户端接受服务器端数据–:write server
扩展
Reactor与Proactor的区别:
- Reactor是非阻塞同步网络模式,感知的是就绪可读写事件,仍需要应用进程进行阻塞式读写内核数据;
- Proactor是异步网络模式,感知的是已完成的读写事件,数据已经被内核处理好了,通知应用进程,应用只需要操作应用内存数据。
这其实也是同步IO与异步IO的主要区别。
目前Window实现了真正的异步IO(IOCP技术),而Linux的aio系列不是真正操作系统级别的异步IO,是用户空间模拟出来的,且仅支持本地文件的异步IO,不支持Sockct网络IO,所以Linux下高性能网络程序都是基于Reactor。
总结
单Reactor单线程相对于传统IO处理,是一种分而治之的思想,但是它无法充分利用IO和CPU的资源;所以有了单Reactor多线程,将业务处理和IO处理进行了分离,将业务处理丢到线程池中处理,充分利用多核CPU,提高了效率;仍不能提高IO的速率,为了平衡IO的速率,使用多Reactor来饱和IO。
有场景有问题才会有优化手段;往往定义问题是解决服务问题的基础。分而治之通常是实现可伸缩性目标的最佳方法。
link:Scalable-IO-in-Java