Netty系列-服务端启动流程源码分析
前言
本篇主要基于Netty构建的Http服务器为切入点,通过阅读源码的方式来讲解其引导过程和各个组件的配置与创建过程。
代码对应Netty版本:4.1.106.Final
启动代码
先来看一个服务端例子:
EchoServer启动类:
/**
* Echoes back any received data from a client.
*/
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx = ServerUtil.buildSslContext();
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
EchoServerHandler类:
/**
* Handler implementation for the echo server.
*/
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
示例代码出自Netty官方示例,地址>
服务器入口程序的启动过程:
1、创建引导类ServerBootstrap
2、设置NioEventLoopGroup线程池(boss处理连接事件、worker处理其它io事件)
3、配置非阻塞的服务端传输NioServerSocketChannel
4、创建ChannelPipeline及添加ChannelHandler,数据流转进行业务处理
5、绑定并启动监听端口
EventLoopGroup配置
EventLoopGroup通过 serverBootstrap.group(bossGroup, workerGroup) 进行配置。
看下对应方法:io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup, io.netty.channel.EventLoopGroup)
/**
* Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
* {@link Channel}'s.
*/
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
// 调用父类group方法
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
可以看到,ServerBootstrap的配置方法属于链式配置方式,返回this。该group方法传入了两个EventLoopGroup,即在主程序main方法中实例化的 NioEventLoopGroup对象,分别对应着bossGroup和workerGroup,均不能为null。
workerGroup赋值给当前类的childGroup变量,bossGroup赋值给父抽象类AbstractBootstrap的group变量;如果只传入一个EventLoopGroup,那么childGroup和group是同一个。
这里的NioEventLoopGroup对象的创建会初始化对应数量的NioEventLoop类型的“线程”,后面会使用到。至于new NioEventLoopGroup后面新写一篇文章分析“事件循环”。
至此,完成EventLoopGroup配置。
Channel配置
对Channel的配置,指定了NioServerSocketChannel.class
对应类方法:io.netty.bootstrap.AbstractBootstrap#channel
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
这里的B和C范型类型,可以通过ServerBootstrap的继承关系(public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {...}
)得到,B对应ServerBootstrap、C对应ServerChannel。可以得到NioServerSocketChannel类是继承ServerChannel的。
内部调用channelFactory方法,传入一个io.netty.channel.ChannelFactory对象,将创建的ReflectiveChannelFactory对象指向AbstractBootstrap类的channelFactory变量。
如果传入的Channel实现类有无参构造器,更推荐调用channel(Class),正如示例代码一样。
那么new ReflectiveChannelFactory(NioServerSocketChannel.class)得到的是什么呢?
如下所示,io.netty.channel.ReflectiveChannelFactory#ReflectiveChannelFactory:
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
该方法是ReflectiveChannelFactory的构造器,它是一个反射Channel工厂,得到传入类NioServerSocketChannel的无参构造器传给constructor变量。
所以,该步骤指定了NioServerSocketChannel的无参构造器,存入ReflectiveChannelFactory类中,ReflectiveChannelFactory类对象又作为AbstractBootstrap类的channelFactory变量。
ChannelHandler配置
接下来看ChannelHandler的配置,首先看handler()方法。
对应类方法:io.netty.bootstrap.AbstractBootstrap#handler(io.netty.channel.ChannelHandler)
/**
* the {@link ChannelHandler} to use for serving the requests.
*/
public B handler(ChannelHandler handler) {
this.handler = ObjectUtil.checkNotNull(handler, "handler");
return self();
}
该方法比较简单,指定了ChannelHandler对象,示例中是自定义的LoggingHandler。
再来看childHandler()方法,该方法同样简单,指定了childHandler。
对应类方法:io.netty.bootstrap.ServerBootstrap#childHandler(io.netty.channel.ChannelHandler)
/**
* Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s.
*/
public ServerBootstrap childHandler(ChannelHandler childHandler) {
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
}
这里,传入的ChannelInitializer匿名类,可以借助类的方法initChannel完成一系列的ChannelHandler的添加。
所以,handler()方法指定了AbstractBootstrap的handler,childHandler()方法指定了ServerBootstrap的childHandler。
端口绑定启动服务
对应主程序代码:ChannelFuture f = b.bind(PORT).sync();
当调用ServerBootStrap的bind(PORT)时进行端口的绑定和服务启动。
对应类方法:io.netty.bootstrap.AbstractBootstrap#bind(int)
/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
创建InetSocketAddress对象,传入端口,端口号限制0到65535之间。
继续看bind方法:
/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
validate()方法进行校验,group和channelFactory不能为null,这里两个参数均在之前步骤已经完成了赋值,group对应着bossGroup,channelFactory对应着ReflectiveChannelFactory。
继续看调用doBind方法:
private ChannelFuture doBind(final SocketAddress localAddress) {
// 调用到这里
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// 进入哪个分支,取决于initAndRegister方法中子线程进行Channel异步注册的结果
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
首先也是重要的第一句,调用initAndRegister()方法:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// NioServerSocketChannel创建
channel = channelFactory.newChannel();
// 初始化
init(channel);
} catch (Throwable t) {
// ...
}
ChannelFuture regFuture = config().group().register(channel);
// ...
return regFuture;
}
initAndRegister()方法共包括了三大步:
NioServerSocketChannel创建
创建后的channel初始化
Channel的异步注册返回ChannelFuture
1、NioServerSocketChannel创建
initAndRegister方法进行Channel创建,这里使用的是channelFactory变量,即ReflectiveChannelFactory对象,调用对应的newChannel();方法
方法比较简单 return constructor.newInstance();
这里的constructor对应着主程序传入的NioServerSocketChannel的无参构造器,进而通过反射进行实例化。
进入类方法:io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel()
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
/**
* Create a new instance
*/
public NioServerSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
DEFAULT_SLECTOR_PROVIDER对应于JDK nio中的SelectorProvider类,所以,底层还是借助于JDK的NIO实现。
SelectorProvider是JDK的抽象类,用于创建NIO的核心对象,如Selector,ServerSocketChannel、SocketChannel等。
紧接着调用:
/**
* Create a new instance using the given {@link SelectorProvider} and protocol family (supported only since JDK 15).
*/
public NioServerSocketChannel(SelectorProvider provider, InternetProtocolFamily family) {
this(newChannel(provider, family));
}
这里有两步,第一步newChannel方法,第二步this构造器
看第一步,newChannel(provider, family)方法,family为null,先不用考虑
private static ServerSocketChannel newChannel(SelectorProvider provider, InternetProtocolFamily family) {
try {
// OPEN_SERVER_SOCKET_CHANNEL_WITH_FAMILY静态变量,查找SelectorProvider中有FAMILY变量的Method,未找到,为null
ServerSocketChannel channel =
SelectorProviderUtil.newChannel(OPEN_SERVER_SOCKET_CHANNEL_WITH_FAMILY, provider, family);
return channel == null ? provider.openServerSocketChannel() : channel;
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
这里会执行provider.openServerSocketChannel(),完成ServerSocketChannel的创建。
看第二步,其它构造器:
/**
* Create a new instance using the given {@link ServerSocketChannel}.
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
这里调用父类构造器,核心干了两件事,指定监听的事件:SelectionKey.OP_ACCEPT,非阻塞;同时内部完成ChannelPipeline的初始化(初始化pipeline最后再看)。
2、创建后的channel初始化
主要体现在initAndRegister方法中,调用init(channel)方法。
对应类方法:io.netty.bootstrap.ServerBootstrap#init
@Override
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
final Collection<ChannelInitializerExtension> extensions = getInitializerExtensions();
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs,
extensions));
}
});
}
});
if (!extensions.isEmpty() && channel instanceof ServerChannel) {
ServerChannel serverChannel = (ServerChannel) channel;
for (ChannelInitializerExtension extension : extensions) {
try {
extension.postInitializeServerListenerChannel(serverChannel);
} catch (Exception e) {
logger.warn("Exception thrown from postInitializeServerListenerChannel", e);
}
}
}
}
配置可选项和属性;同时pipeline通过addLast方法增加ChannelInitializer匿名类,目前initChannel方法还不会被执行。
前文所有步骤,还都是main线程在调用,下面这一步就会涉及到异步eventLoop的线程。
3、Channel的异步注册返回ChannelFuture
主要体现在initAndRegister方法中,调用 ChannelFuture regFuture = config().group().register(channel); 就是调用bossGroup的register(channel)方法。
对应类方法:io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
next方法会进入到一个叫MultithreadEventExecutorGroup类next中,再调用DefaultEventExecutorChooserFactory的next方法轮询得到NioEventLoop。
进入的是io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)的register方法,
最后通过刚eventLoop线程execute提交异步任务执行:io.netty.channel.AbstractChannel.AbstractUnsafe#register0方法:
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// JDK NIO的channel注册选择器Selector
doRegister();
neverRegistered = false;
registered = true;
// 添加初始化时候的ChannelHandler到pipeline中
pipeline.invokeHandlerAddedIfNeeded();
// 回调promise
safeSetSuccess(promise);
//
pipeline.fireChannelRegistered();
// ...
} catch (Throwable t) {
// ...
}
}
该方法对应如下几步:
1、doRegister():完成JDK NIO的Channel注册选择器Selector;但是没有配置感兴趣的事件,即ops为0。
2、进行状态的调整;
3、pipeline.invokeHandlerAddedIfNeeded():进行调用上面三大步骤中的第二步,完成initChannel中ChannelHandler的添加到ChannelPipeline中,同时创建ChannelHandlerContext加入链表;以及添加ServerBootstrapAcceptor处理器。
4、safeSetSuccess(promise):回调promise的监听,即对应ChannelFuture添加的ChannelFutureListener,这里有可能会触发doBind()方法中的ChannelFutureListener回调(为啥是有可能呢?主要因为主线程在调用完initAndRegister后,regFuture有没有拿到结果。)
5、pipeline.fireChannelRegistered():pipeline触发ChannelRegistereds事件,所有加入pipeline中的InboundHandler都会被触发执行。
这里强调下,eventLoop在通过execute提交任务时,触发EventLoop中的run方法,在无限的轮训任务或者阻塞IO监听事件。
完成上面三大步,即完成了iniAndRegister方法,此时回到doBind方法,main线程进行后续执行。
对应类io.netty.bootstrap.AbstractBootstrap#doBind(SocketAddress):
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
// 到这里了
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// regFuture对应着异步操作,这里if else都有可能,如果判断前sleep 1s可以大概率进入if语句体
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
拿到回调regFuture,isDone()判断是否完成。regFuture对应着Channel在子线程中注册的进度,如果完成regFuture.isDone()返回true(完成设置动作在AbstractUnsafe#register0中对safeSetSuccess(promise);的调用),否则返回false。
不管怎么样,都会调用到doBind0方法,完成Channel绑定本地端口。
doBind0会发起ChannelPipeline调用,因为是程序发起,所以从末端开始找outboundHandler,最后到HeadContext,调用AbstractChannel#AbstractNioUnsafe方法完成Channel对端口的绑定。
端口绑定成功后,会调用pipeline.fireChannelActive(); 该方法调用pipeline.readIfIsAutoRead()完成对接收事件SelectionKey.OP_ACCEPT的添加。selectionKey.interestOps(interestOps | readInterestOp); // 此时readInterestOp为SelectionKey.OP_ACCEPT。
ChannelPipeline初始化
在“NioServerSocketChannel创建”的步骤中,会创建Channel对应的ChannelPipeline,对应的实现DefaultChannelPipeline(Channel);
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
初始化时定了tail和head变量,并进行双向指向。这个双向链表对应着ChannelHandlerContext,而每个ChannelHandlerContext关联着ChannelHandler,所以在pipeline中进行ChannelHandler调用时,通过双向链表找到对应的ChannelHandlerContext,再完成ChannelHandler的调用。(具体的ChannelHandler执行顺序相关介绍查看另一篇文章>)
总结
文章从服务器的启动示例代码切入,深入源码介绍了启动涉及的组件和逻辑,文章中关于NioEventLoopGroup和EventLoop这块的线程模型介绍偏少,后续单独写一篇文章介绍吧。
服务器端启动程序的调用图大概如下:
其中1)、2)、3)对应着核心方法AbstractBootstrap#initAndRegister()方法,且第3)步为异步调用。