netty系列-事件循环源码分析
前言
本篇主要介绍Netty中一个重要的概念事件循环,及在非阻塞IO场景与事件循环相关的两个关键的类NioEventLoopGroup和NioEventLoop。文章切入点也是在服务器启动的主程序中。希望可以说清楚下面两点:
1、NioEventLoopGroup与NioEventLoop的概念与联系;
2、从源码的角度分析事件循环是如何进行的;
服务器启动主流程的源码分析,可以看Netty的另一篇文章>。
Netty版本:netty-4.1.106.Final
概念与联系
先看看事件循环概念,从字面上来说,所谓“事件”是指各种IO事件和要执行的任务,如建立连接,读写等;“循环”是指处理过程是持续不断的,一直等待和处理事件。
总结一下,Netty中提到的事件循环是指 EventLoop 对应的线程不停的轮询处理分配给它的Channel上的事件和其它任务。
再看看NioEventLoopGroup与NioEventLoop,在Netty中,NioEventLoop对应非阻塞的IO多路复用的事件处理场景。EventLoop是上文事件循环的术语和对应接口。它由一个永远都不会改变的Thread驱动,在其整个生命周期内,进行处理与其关联的Channel的IO事件和任务。
非阻塞:是对于主程序来说,发送IO数据等事件不会阻塞主程序。通过事件回调通知结果。
IO多路复用:一个线程可以对多个Channel进行监听,Selector是抽象层,底层使用操作系统的select、poll、epoll能力。
NioEventLoopGroup与NioEventLoop对应的类图(idea生成):
由图可知:
1、它是JDK线程执行器的派生,对应Netty的线程模型;
2、EventLoop间接继承了ScheduledExecutorService,使其支持任务调度(延迟或周期执行任务)。
几个关键接口的说明:
EventExecutorGroup:通过next()方法提供EventExecutor,并管理它们的生命周期及可以全局关闭它,它继承JDK的Executor相关接口,具有任务提交和执行的能力。方法有next()、showdown()、submit(Runnable r)等。
EventExecutor:继承EventExecutorGroup,是一个特殊的EventExecutorGroup,它额外提供了一些方法用于很方便的查看当前线程是否在eventLoop事件循环中进行的。如inEventLoop()方法。
EventLoopGroup:继承EventExecutorGroup,也是一个特殊的EventExecutorGroup,允许注册Channel,这些Channel在后续可以进行Selector。方法如register(Channel channel)等。
EventLoop:继承EventLoopGroup和EventExecutor,Channel注册后,进行处理所有的IO事件。额外提供了parent()方法。
小结一下:
含Loop关键词的针对IO事件场景进行的定义,处理事件循环。
含Executor:属于通用的任务执行机制。
Nio中,EventLoop对应Channel为1:n
Oio中,EventLoop对应Channel为1:1,截止4.1.106.Final版本已标记废除
事件循环源码
创建过程
在Bootstrap引导时,定义EventLoopGroup如下:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
1、接下来看看构造器初始化NioEventLoopGroup,具体都做了哪些事。
进入NioEventLoopGroup(1)构造器查看:
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
如果nThreads不传,nThreads赋值为0,下面会提到对0的处理。
调用内部重载构造器:
public NioEventLoopGroup(int nThreads, Executor executor) {
// SelectorProvider.provider() 初始化一个JDK NIO的SelectorProvider
this(nThreads, executor, SelectorProvider.provider());
}
SelectorProvider属于JDK的NIO相关类,这里会创建对应的实例。
继续调用重载构造器,会依次创建,如下:
DefaultSelectStrategyFactory.INSTANCE:选择器执行策略工厂类,决定着后续如何做事件循环,目前就这一种实现。
任务添加失败时的拒绝策略,new RejectedExecutionHandler(),会抛出异常RejectedExecutionException。
2、继续调用父类MultithreadEventLoopGroup构造器
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
// DEFAULT_EVENT_LOOP_THREADS,默认为CPU核心数 * 2
// 调用父类MultithreadEventExecutorGroup的构造方法
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
此处会判断传入线程数量是否为0,为0表示未设置,则默认为机器CPU核数*2。
3、继续调用父类MultithreadEventExecutorGroup构造器
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
这里主要关注,实例化一个ExecutorChooser的工厂,工厂生成的类决定如何选择下一个EventLoop。
DefaultEventExecutorChooserFactory.INSTANCE 单例模式,有两个内部类,也是对应工厂生成的类:PowerOfTwoEventExecutorChooser 和 GenericEventExecutorChooser,PowerOfTwoEventExecutorChooser 用于线程数为2的幂次方的情况,获取下一个线程使用&计算更快(Netty的高性能体现)。
继续调用构造器:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// 初始化是为null
if (executor == null) {
// 默认使用ThreadPerTaskExecutor,其实现了JDK的Executor接口;就一个方法,使用线程工厂创建线程并执行
// 参数newDefaultThreadFactory()对应线程工厂类:DefaultThreadFactory
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 创建EventExecutor类型的数组,长度为nThreads
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 核心方法,创建EventExecutor,由子类实现
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 初始化不成功的情况,优雅关闭之前所有已经初始化的线程
// ...
}
}
// 创建eventLoop的选择器,用于选择下一个eventLoop使用
chooser = chooserFactory.newChooser(children);
// Future完成监听器,当所有的eventLoop都关闭时,设置terminationFuture为成功
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
// 当一个 EventExecutor 结束其操作(比如关闭或终止),
// 相应的 terminationFuture 将会被通知,随后触发上面terminationListener。
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
// 存储所有的eventLoop,不可修改的EventLoop到 readonlyChildren 变量
// ...
}
这里主要就是初始化children数组这个当前类成员变量,对应一定数量的EventLoop。
另外:1)、如果executor为空的话,创建ThreadPerTaskExecutor对象,参数使用线程工厂类DefaultThreadFactory。2)、根据children的大小创建ExecutorChooser,如何选择下一个EventLoop。
ThreadPerTaskExecutor比较简单就实现了Executor#execute方法:使用DefaultThreadFactory生成Thread然后start启动线程执行任务。
下面重点看下此句:children[i] = newChild(executor, args);
// io.netty.channel.nio.NioEventLoopGroup#newChild
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
// 通过args获取3个参数,分别是EventLoopGroup构造过程中创建的SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler
SelectorProvider selectorProvider = (SelectorProvider) args[0];
SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
// 创建 NioEventLoop时taskQueueFactory与tailTaskQueueFactory为null
EventLoopTaskQueueFactory taskQueueFactory = null;
EventLoopTaskQueueFactory tailTaskQueueFactory = null;
// ...
// 创建 NioEventLoop
return new NioEventLoop(this, executor, selectorProvider,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
所以,newChild创建NioEventLoop实例,赋值给EventExecutor。
4、继续看NioEventLoop的构造器,创建NioEventLoop
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
// 调用父类构造方法,SingleThreadEventLoop
super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler);
// ...
// provider.openSelector() 创建的选择器,以及对应的优化的选择器。优化的选择器二元组(也可能未优化)
final SelectorTuple selectorTuple = openSelector();
// 包装的选择器(也可能是未包装的)
this.selector = selectorTuple.selector;
// 未包装的选择器
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
可以看到其核心做了3件事:
1)、调用newTaskQueue创建任务队列
2)、调用父类构造器
3)、包装Selector选择器(检测多个Channel通道的IO事件变化)
首先调用newTaskQueue创建任务队列,队列长度为Integer最大值,底层是JCTools的MpscUnboundedArrayQueue队列,名字可以看出其为多生产者单消费者无界队列,同时是线程安全的。
其次调用父类构造器(-> SingleThreadEventLoop -> SingleThreadEventExecutor)完成对应变量的赋值,需要注意NioEventLoop调用父类构造器SingleThreadEventLoop时,传入两个队列taskQueue和tailQueue,其中taskQueue是主要且优先的任务队列,tailQueue中的任务优先级会低一些,在taskQueue任务执行完后会执行tailQueue的。
最后包装Selector选择器,这是Netty高性能的另一体现,对应方法openSelector();
// io.netty.channel.nio.NioEventLoop#openSelector
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
// 创建一个选择器,这里的选择器是未包装的,下面会根据条件决定是否要进行优化
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
// 如果禁用了优化,直接返回,默认是不禁用的
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
// 反射查找sun.nio.ch.SelectorImpl类
Object maybeSelectorImplClass = 类Class
// ...
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
// netty中定义Set集合SelectedSelectionKeySet,内部结构是SelectionKey类型数组,用于存放SelectionKey,容量大小1024
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
// 反射,使用selectedKeySet来替换选择器类selectorImplClass中的selectedKeys和publicSelectedKeys字段
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
// ...
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
// 返回一个二元组,包含了未包装的选择器和包装的选择器
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
部分代码已省去。代码中final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
SelectedSelectionKeySet类是一个Set集合类,继承了AbstractSet<SelectionKey>,内部结构是SelectionKey类型数组,用于存放SelectionKey,容量大小1024。该集合通过反射赋值给原来Selector选择器的selectedKeys和publicSelectedKeys字段,实现替换。最后NioEventLoop的selector选择器变量使用new SelectedSelectionKeySetSelector(继承了Selector)对象进行包装。
那么为什么要替换SelectionKey集合呢?
因为Netty实现的是数组类型,在空间上元素不用额外的数据结构entry进行包装;在时间上遍历不用再Hash寻址及使用迭代器,数组地址相邻,访问更快。
至此,两个NioEventLoopGroup下的所需NioEventLoop均创建完成。
new NioEventLoopGroup(1)对应1个NioEventLoop;
new NioEventLoopGroup()对应16个NioEventLoop;(因为电脑是8核)
执行循环处理
NioEventLoop创建完成后,对应IO事件和任务所需要的线程创建完成了,接下来看看NioEventLoop事件循环是如何轮询和执行任务的。
回到引导类的配置和启动流程,对应上篇文章可以查看>,当主线程第一个次向某个Executor提交任务的时候,会触发EventLoop执行事件循环来持续检测和处理IO事件和任务。
// io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
/**
* 启动线程,会调用 SingleThreadEventExecutor.this.run();
*/
private void doStartThread() {
// 线程需要是null的情况
assert thread == null;
// executor最终会使用ThreadPerTaskExecutor的threadFactory创建一个线程,并立马执行
executor.execute(new Runnable() {
@Override
public void run() {
// ...
boolean success = false;
updateLastExecutionTime();
try {
// 核心方法
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
}
// ...
}
});
}
先说下executor,如图它是一个包装类,包括了两个成员变量。
最终会使用第一个变量executor创建一个新线程,并指定当前线程对应的EventExecutor为第二个变量NioEventLoop。
接下来看核心方法,新线程执行触发SingleThreadEventExecutor.this.run();,这里的SingleThreadEventExecutor.this写法是一种内部类引用外部类对象的方式,此处即调用SingleThreadEventExecutor类NioEventLoop对象的run方法,如下:
// io.netty.channel.nio.NioEventLoop#run
@Override
protected void run() {
int selectCnt = 0;
// 无限循环,EventLoop不断的处理I/O事件和任务
for (;;) {
try {
int strategy;
try {
// 通过selectStrategy来计算接下来的操作
// 如果没有任务,返回策略:SelectStrategy.SELECT;
// 如果有任务,不阻塞的selectNow一下,看看有没有IO事件,没有则返回0,赋值给strategy。switch的值都是负数,所以不会进入switch-case里面
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
// 继续等待
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
// 不支持忙等待,所以走SELECT
case SelectStrategy.SELECT: // 没有任务时进入
// 下一个"定时任务"的截止时间,没有则为-1
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
// NONE 等于 Long.MAX_VALUE;
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
// 判断是否有任务执行
if (!hasTasks()) {
// 阻塞指定的时间,如果没有事件到达,就会返回0
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
// 进入这里说明执行过select
// 更新计数器及相关标志位
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
// IO任务处理时间占比,默认为50,不管ioRatio值为多少,最后都会执行runAllTasks方法
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
// 如果ioRatio为100,先处理IO事件
try {
if (strategy > 0) {
// strategy > 0,说明有IO事件发生,需要处理
// 处理IO事件
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
// 处理非IO任务
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
// 计算处理IO事件的时间
final long ioTime = System.nanoTime() - ioStartTime;
// 基于IO的运行时间计算非IO任务执行时间,然后执行任务(每64个任务进行一次时间的判断)
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
// 没有IO任务,只执行任务。时间0表示会执行一批任务,目前是最多64个
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
// 如果运行了任务 或 上面有IO事件发生
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
// unexpectedSelectorWakeup 检查是否发生了意外的选择器唤醒。这是一个异常情况,通常不应该发生,除非有外部因素或者内部错误导致选择器提前返回。
selectCnt = 0;
}
} catch (XXXException e) {
// ...
} finally {
// 尽管循环处理抛出异常,也始终处理关闭。
// ...
}
}
}
}
在分析方法之前,先了解下,选择器的几种检测IO事件的方法:
selector.selectNow():不阻塞;不管是否有读写事件发生,都会立即返回。
selector.select(timeoutMillis):阻塞;如果没有读写事件发生,会阻塞timeoutMillis毫秒,如果超时还没有读写事件发生,就会返回0。
selector.select():阻塞;如果没有读写事件发生,会一直阻塞,直到有读写事件发生。
对应NioEventLoop#run方法,共包括了6步:
1)、首先通过for进行无限循环,EventLoop不断的检查和处理I/O事件和任务。
2)、通过selectStrategy来计算接下来的操作,先通过hasTasks()判断是否有任务。
这里的hasTasks(),上文提到过有两个队列taskQueue和tailQueue,有一个不为空则表示又任务会返回true
2.1)、如果没有任务,返回策略SelectStrategy的SELECT类型; 进入switch的SELECT策略,判断需要阻塞多久,此时会判断是否有时间调度类型的任务(PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue),如果没有则阻塞Long.MAX_VALUE;
2.2)、如果有任务,执行selectNow(),返回的值为IO事件数;不管IO事件数是多少,因为有任务要执行,不会进入switch-case,然后进入第3)步
3)、更新计数器及相关标志位,主要关注selectCnt,下面单独介绍有关selectCnt的作用。
这里有个ioRatio变量,它表示IO任务处理时间占比,默认为50,值越大IO任务时间占比越长,值越小,非IO任务的执行时间占比越长,不过不管ioRatio值为多少,最后都会执行runAllTasks方法。
4)、执行processSelectedKeys();处理IO事件,会判断是否需要使用优化的Selector(上文提到过),而后拿到SelectionKey集合数据,判断对应的Channel通道及IO事件。
// io.netty.channel.nio.NioEventLoop#processSelectedKeys
/**
* 处理IO事件
*/
private void processSelectedKeys() {
// 如果存在优化的selectedKeys,说明是优化的处理方式
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
5)、紧接着执行队列任务,这里也有3种方式:
runAllTasks():无时间限制处理所有任务;
runAllTasks(timeoutNanos):执行时间有时间限制,ioRatio变量决定,每64个任务进行一次时间的判断;
runAllTasks(0):尽量少的执行任务,只会执行一批次即最多64个。
不管哪种方式,执行的任务队列顺序为:scheduledTaskQueue -> taskQueue -> tail,其中scheduledTaskQueue对应着时间调度任务队列。
上面提到的64个任务,在runAllTasks的表现如下:
// 由于nanoTime()相对昂贵,因此每64个任务(二进制与运算)检查一次超时。
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = getCurrentTimeNanos();
if (lastExecutionTime >= deadline) {
break;
}
}
6)、有关selectCnt变量,如果运行了任务 或 上面有IO事件发生,一切正常,则selectCnt重置为0。在JDK7的版本中有个关NIO Selector空转导致CPU 100%的bug,Netty通过绕过该bug进行规避,就是判断selectCnt变量大小,如果没有任务且没有IO事件,selectCnt仍在上升,当超过阈值512时,则会重建Selector,对应unexpectedSelectorWakeup(selectCnt)进行判断与重建。
至此,事件循环EventLoop所做的事情:不停的监测和执行IO与任务。
这里还要一个问题需要说明,Selector选择器执行select()方法,处于一直阻塞中,那么对于往EventLoop增加的任务何时执行呢?
// io.netty.channel.nio.NioEventLoop#wakeup
@Override
protected void wakeup(boolean inEventLoop) {
// 不在事件循环中,且nextWakeupNanos不等于AWAKE
if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
// 触发事件循环(EventLoop)从等待状态(如select调用)中返回
selector.wakeup();
}
}
这里面涉及一个变量nextWakeupNanos,它表示下次唤醒到当前的时间,默认是AWAKE即-1,当进入Selector的select()或select(阻塞时间)时便会更新nextWakeupNanos为非-1(select对应Long最大值,select(阻塞时间)即阻塞时间大小),即nextWakeupNanos为-1时不需要进行唤醒,Selector没有进入select阻塞。
wakeup方法执行的时机是在execute提交任务时,指定是否立即执行,且当前线程不是EventLoop所在线程中,且nextWakeupNanos变量值不是-1(同时置成-1),则进行IO事件监测唤醒。
总结
Netty是一个高性能的异步及事件驱动网络框架,通过事件循环来监测和处理IO事件及非IO任务。
在事件循环之前,通过NioEventLoopGroup完成NioEventLoop的创建,同时在“主”的线程组中注册监测Accept事件,而后创建的SocketChannel交由“工作”线程组完成IO读写等事件。
这对应于Reactor模式中“多Reactor多线程”的模式,只不过Netty中的业务逻辑处理也是有EventLoop所在线程执行,Netty是为了避免不必要的线程切换,Netty也推荐在同一个EventLoop线程中处理相关的业务逻辑。如果涉及到长时间的任务,可以在业务处理时再增加线程池处理。