前言

本篇主要介绍Netty中一个重要的概念事件循环,及在非阻塞IO场景与事件循环相关的两个关键的类NioEventLoopGroup和NioEventLoop。文章切入点也是在服务器启动的主程序中。希望可以说清楚下面两点:

1、NioEventLoopGroup与NioEventLoop的概念与联系;

2、从源码的角度分析事件循环是如何进行的;

服务器启动主流程的源码分析,可以看Netty的另一篇文章>。

Netty版本:netty-4.1.106.Final

概念与联系

先看看事件循环概念,从字面上来说,所谓“事件”是指各种IO事件和要执行的任务,如建立连接,读写等;“循环”是指处理过程是持续不断的,一直等待和处理事件。

总结一下,Netty中提到的事件循环是指 EventLoop 对应的线程不停的轮询处理分配给它的Channel上的事件和其它任务。

再看看NioEventLoopGroup与NioEventLoop,在Netty中,NioEventLoop对应非阻塞IO多路复用的事件处理场景。EventLoop是上文事件循环的术语和对应接口。它由一个永远都不会改变的Thread驱动,在其整个生命周期内,进行处理与其关联的Channel的IO事件和任务。

非阻塞:是对于主程序来说,发送IO数据等事件不会阻塞主程序。通过事件回调通知结果。

IO多路复用:一个线程可以对多个Channel进行监听,Selector是抽象层,底层使用操作系统的select、poll、epoll能力。

NioEventLoopGroup与NioEventLoop对应的类图(idea生成):

由图可知:

1、它是JDK线程执行器的派生,对应Netty的线程模型;

2、EventLoop间接继承了ScheduledExecutorService,使其支持任务调度(延迟或周期执行任务)。


几个关键接口的说明:

  • EventExecutorGroup:通过next()方法提供EventExecutor,并管理它们的生命周期及可以全局关闭它,它继承JDK的Executor相关接口,具有任务提交和执行的能力。方法有next()、showdown()、submit(Runnable r)等。

  • EventExecutor:继承EventExecutorGroup,是一个特殊的EventExecutorGroup,它额外提供了一些方法用于很方便的查看当前线程是否在eventLoop事件循环中进行的。如inEventLoop()方法。

  • EventLoopGroup:继承EventExecutorGroup,也是一个特殊的EventExecutorGroup,允许注册Channel,这些Channel在后续可以进行Selector。方法如register(Channel channel)等。

  • EventLoop:继承EventLoopGroup和EventExecutor,Channel注册后,进行处理所有的IO事件。额外提供了parent()方法。

小结一下:

含Loop关键词的针对IO事件场景进行的定义,处理事件循环。

含Executor:属于通用的任务执行机制。

Nio中,EventLoop对应Channel为1:n

Oio中,EventLoop对应Channel为1:1,截止4.1.106.Final版本已标记废除

事件循环源码

创建过程

在Bootstrap引导时,定义EventLoopGroup如下:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);

EventLoopGroup workerGroup = new NioEventLoopGroup();

1、接下来看看构造器初始化NioEventLoopGroup,具体都做了哪些事。

进入NioEventLoopGroup(1)构造器查看:

public NioEventLoopGroup(int nThreads) {
	this(nThreads, (Executor) null);
}

如果nThreads不传,nThreads赋值为0,下面会提到对0的处理。

调用内部重载构造器:

public NioEventLoopGroup(int nThreads, Executor executor) {
    // SelectorProvider.provider() 初始化一个JDK NIO的SelectorProvider
    this(nThreads, executor, SelectorProvider.provider());
}

SelectorProvider属于JDK的NIO相关类,这里会创建对应的实例。

继续调用重载构造器,会依次创建,如下:

  • DefaultSelectStrategyFactory.INSTANCE:选择器执行策略工厂类,决定着后续如何做事件循环,目前就这一种实现。

  • 任务添加失败时的拒绝策略,new RejectedExecutionHandler(),会抛出异常RejectedExecutionException。

2、继续调用父类MultithreadEventLoopGroup构造器

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    // DEFAULT_EVENT_LOOP_THREADS,默认为CPU核心数 * 2
    // 调用父类MultithreadEventExecutorGroup的构造方法
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

此处会判断传入线程数量是否为0,为0表示未设置,则默认为机器CPU核数*2。

3、继续调用父类MultithreadEventExecutorGroup构造器

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

这里主要关注,实例化一个ExecutorChooser的工厂,工厂生成的类决定如何选择下一个EventLoop。

DefaultEventExecutorChooserFactory.INSTANCE 单例模式,有两个内部类,也是对应工厂生成的类:PowerOfTwoEventExecutorChooser 和 GenericEventExecutorChooser,PowerOfTwoEventExecutorChooser 用于线程数为2的幂次方的情况,获取下一个线程使用&计算更快(Netty的高性能体现)。

继续调用构造器:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
	// 初始化是为null
    if (executor == null) {
        // 默认使用ThreadPerTaskExecutor,其实现了JDK的Executor接口;就一个方法,使用线程工厂创建线程并执行
        // 参数newDefaultThreadFactory()对应线程工厂类:DefaultThreadFactory
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
    // 创建EventExecutor类型的数组,长度为nThreads
    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // 核心方法,创建EventExecutor,由子类实现
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            // 初始化不成功的情况,优雅关闭之前所有已经初始化的线程
            // ...
        }
    }

    // 创建eventLoop的选择器,用于选择下一个eventLoop使用
    chooser = chooserFactory.newChooser(children);

    // Future完成监听器,当所有的eventLoop都关闭时,设置terminationFuture为成功
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    // 当一个 EventExecutor 结束其操作(比如关闭或终止),
    // 相应的 terminationFuture 将会被通知,随后触发上面terminationListener。
    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    // 存储所有的eventLoop,不可修改的EventLoop到 readonlyChildren 变量
	// ...
}

这里主要就是初始化children数组这个当前类成员变量,对应一定数量的EventLoop。

另外:1)、如果executor为空的话,创建ThreadPerTaskExecutor对象,参数使用线程工厂类DefaultThreadFactory。2)、根据children的大小创建ExecutorChooser,如何选择下一个EventLoop。

ThreadPerTaskExecutor比较简单就实现了Executor#execute方法:使用DefaultThreadFactory生成Thread然后start启动线程执行任务。


下面重点看下此句:children[i] = newChild(executor, args);

// io.netty.channel.nio.NioEventLoopGroup#newChild
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    // 通过args获取3个参数,分别是EventLoopGroup构造过程中创建的SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler
    SelectorProvider selectorProvider = (SelectorProvider) args[0];
    SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
    RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
	// 创建 NioEventLoop时taskQueueFactory与tailTaskQueueFactory为null
    EventLoopTaskQueueFactory taskQueueFactory = null;
    EventLoopTaskQueueFactory tailTaskQueueFactory = null;
	// ...
    // 创建 NioEventLoop
    return new NioEventLoop(this, executor, selectorProvider,
            selectStrategyFactory.newSelectStrategy(),
            rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}

所以,newChild创建NioEventLoop实例,赋值给EventExecutor。

4、继续看NioEventLoop的构造器,创建NioEventLoop

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
             EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {

    // 调用父类构造方法,SingleThreadEventLoop
    super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
            rejectedExecutionHandler);
	// ...
    // provider.openSelector() 创建的选择器,以及对应的优化的选择器。优化的选择器二元组(也可能未优化)
    final SelectorTuple selectorTuple = openSelector();
    // 包装的选择器(也可能是未包装的)
    this.selector = selectorTuple.selector;
    // 未包装的选择器
    this.unwrappedSelector = selectorTuple.unwrappedSelector;
}

可以看到其核心做了3件事:

1)、调用newTaskQueue创建任务队列

2)、调用父类构造器

3)、包装Selector选择器(检测多个Channel通道的IO事件变化)

首先调用newTaskQueue创建任务队列,队列长度为Integer最大值,底层是JCTools的MpscUnboundedArrayQueue队列,名字可以看出其为多生产者单消费者无界队列,同时是线程安全的。

其次调用父类构造器(-> SingleThreadEventLoop -> SingleThreadEventExecutor)完成对应变量的赋值,需要注意NioEventLoop调用父类构造器SingleThreadEventLoop时,传入两个队列taskQueue和tailQueue,其中taskQueue是主要且优先的任务队列,tailQueue中的任务优先级会低一些,在taskQueue任务执行完后会执行tailQueue的。

最后包装Selector选择器,这是Netty高性能的另一体现,对应方法openSelector();

// io.netty.channel.nio.NioEventLoop#openSelector
private SelectorTuple openSelector() {
    final Selector unwrappedSelector;
    try {
        // 创建一个选择器,这里的选择器是未包装的,下面会根据条件决定是否要进行优化
        unwrappedSelector = provider.openSelector();
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }

    // 如果禁用了优化,直接返回,默认是不禁用的
    if (DISABLE_KEY_SET_OPTIMIZATION) {
        return new SelectorTuple(unwrappedSelector);
    }

    // 反射查找sun.nio.ch.SelectorImpl类
    Object maybeSelectorImplClass = 类Class
	// ...

    final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
	// netty中定义Set集合SelectedSelectionKeySet,内部结构是SelectionKey类型数组,用于存放SelectionKey,容量大小1024
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                // 反射,使用selectedKeySet来替换选择器类selectorImplClass中的selectedKeys和publicSelectedKeys字段
                selectedKeysField.set(unwrappedSelector, selectedKeySet);
                publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                return null;
            } catch (NoSuchFieldException e) {
                return e;
            } catch (IllegalAccessException e) {
                return e;
            }
        }
    });
 	// ...

    selectedKeys = selectedKeySet;
    logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
    // 返回一个二元组,包含了未包装的选择器和包装的选择器
    return new SelectorTuple(unwrappedSelector,
                             new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}

部分代码已省去。代码中final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

SelectedSelectionKeySet类是一个Set集合类,继承了AbstractSet<SelectionKey>,内部结构是SelectionKey类型数组,用于存放SelectionKey,容量大小1024。该集合通过反射赋值给原来Selector选择器的selectedKeys和publicSelectedKeys字段,实现替换。最后NioEventLoop的selector选择器变量使用new SelectedSelectionKeySetSelector(继承了Selector)对象进行包装。

那么为什么要替换SelectionKey集合呢

因为Netty实现的是数组类型,在空间上元素不用额外的数据结构entry进行包装;在时间上遍历不用再Hash寻址及使用迭代器,数组地址相邻,访问更快。

至此,两个NioEventLoopGroup下的所需NioEventLoop均创建完成。

new NioEventLoopGroup(1)对应1个NioEventLoop;

new NioEventLoopGroup()对应16个NioEventLoop;(因为电脑是8核)

执行循环处理

NioEventLoop创建完成后,对应IO事件和任务所需要的线程创建完成了,接下来看看NioEventLoop事件循环是如何轮询和执行任务的。

回到引导类的配置和启动流程,对应上篇文章可以查看>,当主线程第一个次向某个Executor提交任务的时候,会触发EventLoop执行事件循环来持续检测和处理IO事件和任务。

// io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
/**
 * 启动线程,会调用 SingleThreadEventExecutor.this.run();
 */
private void doStartThread() {
    // 线程需要是null的情况
    assert thread == null;
    // executor最终会使用ThreadPerTaskExecutor的threadFactory创建一个线程,并立马执行
    executor.execute(new Runnable() {
        @Override
        public void run() {
            // ...
            boolean success = false;
            updateLastExecutionTime();
            try {
                // 核心方法
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            }
			// ...
        }
    });
}

先说下executor,如图它是一个包装类,包括了两个成员变量。

最终会使用第一个变量executor创建一个新线程,并指定当前线程对应的EventExecutor为第二个变量NioEventLoop。

接下来看核心方法,新线程执行触发SingleThreadEventExecutor.this.run();,这里的SingleThreadEventExecutor.this写法是一种内部类引用外部类对象的方式,此处即调用SingleThreadEventExecutor类NioEventLoop对象的run方法,如下:

// io.netty.channel.nio.NioEventLoop#run
@Override
protected void run() {
    int selectCnt = 0;
    // 无限循环,EventLoop不断的处理I/O事件和任务
    for (;;) {
        try {
            int strategy;
            try {
                // 通过selectStrategy来计算接下来的操作
                //  如果没有任务,返回策略:SelectStrategy.SELECT;
                //  如果有任务,不阻塞的selectNow一下,看看有没有IO事件,没有则返回0,赋值给strategy。switch的值都是负数,所以不会进入switch-case里面
                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                    // 继续等待
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO
                        // 不支持忙等待,所以走SELECT

                    case SelectStrategy.SELECT: // 没有任务时进入
                        // 下一个"定时任务"的截止时间,没有则为-1
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            // NONE 等于 Long.MAX_VALUE;
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            // 判断是否有任务执行
                            if (!hasTasks()) {
                                // 阻塞指定的时间,如果没有事件到达,就会返回0
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                }
            } catch (IOException e) {
                // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                // the selector and retry. https://github.com/netty/netty/issues/8566
                rebuildSelector0();
                selectCnt = 0;
                handleLoopException(e);
                continue;
            }
            // 进入这里说明执行过select
            // 更新计数器及相关标志位
            selectCnt++;
            cancelledKeys = 0;
            needsToSelectAgain = false;
            // IO任务处理时间占比,默认为50,不管ioRatio值为多少,最后都会执行runAllTasks方法
            final int ioRatio = this.ioRatio;
            boolean ranTasks;
            if (ioRatio == 100) {
                // 如果ioRatio为100,先处理IO事件
                try {
                    if (strategy > 0) {
                        // strategy > 0,说明有IO事件发生,需要处理
                        // 处理IO事件
                        processSelectedKeys();
                    }
                } finally {
                    // Ensure we always run tasks.
                    // 处理非IO任务
                    ranTasks = runAllTasks();
                }
            } else if (strategy > 0) {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    // 计算处理IO事件的时间
                    final long ioTime = System.nanoTime() - ioStartTime;
                    // 基于IO的运行时间计算非IO任务执行时间,然后执行任务(每64个任务进行一次时间的判断)
                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            } else {
                // 没有IO任务,只执行任务。时间0表示会执行一批任务,目前是最多64个
                ranTasks = runAllTasks(0); // This will run the minimum number of tasks
            }

            // 如果运行了任务 或 上面有IO事件发生
            if (ranTasks || strategy > 0) {
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }

                selectCnt = 0;
            } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                // unexpectedSelectorWakeup 检查是否发生了意外的选择器唤醒。这是一个异常情况,通常不应该发生,除非有外部因素或者内部错误导致选择器提前返回。
                selectCnt = 0;
            }
        } catch (XXXException e) {
           // ...
        } finally {
            // 尽管循环处理抛出异常,也始终处理关闭。
            // ...
            }
        }
    }
}

在分析方法之前,先了解下,选择器的几种检测IO事件的方法:

  • selector.selectNow():不阻塞;不管是否有读写事件发生,都会立即返回。

  • selector.select(timeoutMillis):阻塞;如果没有读写事件发生,会阻塞timeoutMillis毫秒,如果超时还没有读写事件发生,就会返回0。

  • selector.select():阻塞;如果没有读写事件发生,会一直阻塞,直到有读写事件发生。

对应NioEventLoop#run方法,共包括了6步:

1)、首先通过for进行无限循环,EventLoop不断的检查和处理I/O事件和任务。

2)、通过selectStrategy来计算接下来的操作,先通过hasTasks()判断是否有任务。

这里的hasTasks(),上文提到过有两个队列taskQueue和tailQueue,有一个不为空则表示又任务会返回true

2.1)、如果没有任务,返回策略SelectStrategy的SELECT类型; 进入switch的SELECT策略,判断需要阻塞多久,此时会判断是否有时间调度类型的任务(PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue),如果没有则阻塞Long.MAX_VALUE;

2.2)、如果有任务,执行selectNow(),返回的值为IO事件数;不管IO事件数是多少,因为有任务要执行,不会进入switch-case,然后进入第3)步

3)、更新计数器及相关标志位,主要关注selectCnt,下面单独介绍有关selectCnt的作用。

这里有个ioRatio变量,它表示IO任务处理时间占比,默认为50,值越大IO任务时间占比越长,值越小,非IO任务的执行时间占比越长,不过不管ioRatio值为多少,最后都会执行runAllTasks方法。

4)、执行processSelectedKeys();处理IO事件,会判断是否需要使用优化的Selector(上文提到过),而后拿到SelectionKey集合数据,判断对应的Channel通道及IO事件。

// io.netty.channel.nio.NioEventLoop#processSelectedKeys
/**
 * 处理IO事件
 */
private void processSelectedKeys() {
    // 如果存在优化的selectedKeys,说明是优化的处理方式
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

5)、紧接着执行队列任务,这里也有3种方式:

  • runAllTasks():无时间限制处理所有任务;

  • runAllTasks(timeoutNanos):执行时间有时间限制,ioRatio变量决定,每64个任务进行一次时间的判断;

  • runAllTasks(0):尽量少的执行任务,只会执行一批次即最多64个。

不管哪种方式,执行的任务队列顺序为:scheduledTaskQueue -> taskQueue -> tail,其中scheduledTaskQueue对应着时间调度任务队列。

上面提到的64个任务,在runAllTasks的表现如下:

// 由于nanoTime()相对昂贵,因此每64个任务(二进制与运算)检查一次超时。
if ((runTasks & 0x3F) == 0) {
    lastExecutionTime = getCurrentTimeNanos();
    if (lastExecutionTime >= deadline) {
        break;
    }
}

6)、有关selectCnt变量,如果运行了任务 或 上面有IO事件发生,一切正常,则selectCnt重置为0。在JDK7的版本中有个关NIO Selector空转导致CPU 100%的bug,Netty通过绕过该bug进行规避,就是判断selectCnt变量大小,如果没有任务且没有IO事件,selectCnt仍在上升,当超过阈值512时,则会重建Selector,对应unexpectedSelectorWakeup(selectCnt)进行判断与重建。

对应JDK的Bug地址1>地址2>

至此,事件循环EventLoop所做的事情:不停的监测和执行IO与任务

这里还要一个问题需要说明,Selector选择器执行select()方法,处于一直阻塞中,那么对于往EventLoop增加的任务何时执行呢?

// io.netty.channel.nio.NioEventLoop#wakeup
@Override
protected void wakeup(boolean inEventLoop) {
    // 不在事件循环中,且nextWakeupNanos不等于AWAKE
    if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
        // 触发事件循环(EventLoop)从等待状态(如select调用)中返回
        selector.wakeup();
    }
}

这里面涉及一个变量nextWakeupNanos,它表示下次唤醒到当前的时间,默认是AWAKE即-1,当进入Selector的select()或select(阻塞时间)时便会更新nextWakeupNanos为非-1(select对应Long最大值,select(阻塞时间)即阻塞时间大小),即nextWakeupNanos为-1时不需要进行唤醒,Selector没有进入select阻塞

wakeup方法执行的时机是在execute提交任务时,指定是否立即执行,且当前线程不是EventLoop所在线程中,且nextWakeupNanos变量值不是-1(同时置成-1),则进行IO事件监测唤醒。

总结

Netty是一个高性能的异步事件驱动网络框架,通过事件循环来监测和处理IO事件及非IO任务。

在事件循环之前,通过NioEventLoopGroup完成NioEventLoop的创建,同时在“主”的线程组中注册监测Accept事件,而后创建的SocketChannel交由“工作”线程组完成IO读写等事件。

这对应于Reactor模式中“多Reactor多线程”的模式,只不过Netty中的业务逻辑处理也是有EventLoop所在线程执行,Netty是为了避免不必要的线程切换,Netty也推荐在同一个EventLoop线程中处理相关的业务逻辑。如果涉及到长时间的任务,可以在业务处理时再增加线程池处理。