Netty系列-组件
前言
本篇会通过一个示例引入Netty的相关组件,以及这些组件的关系,这也是Netty的核心,任何功能的扩展都是基于某个或某些组件。
一个例子
例子是实现一个服务端的http服务,其来源于github Netty example模块,代码地址>。
服务端启动入口:
/**
* An HTTP server that sends back the content of the received HTTP request
* in a pretty plaintext form.
*/
public final class HttpHelloWorldServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8081"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx = ServerUtil.buildSslContext();
// Configure the server.
// 创建两个EventLoopGroup,作为两个独立的线程池
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 服务端引导类ServerBootstrap,对所有组件进行配置使其进行启动
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.SO_BACKLOG, 1024);
// 配置线程池
b.group(bossGroup, workerGroup)
// 配置非阻塞的服务端传输Channel
.channel(NioServerSocketChannel.class)
// 处理连接io和其它事件io的处理器
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new HttpHelloWorldServerInitializer(sslCtx));
// 异步绑定端口,调用sync()直到绑定成功。
Channel ch = b.bind(PORT).sync().channel();
System.err.println("Open your web browser and navigate to " +
(SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');
// 阻塞Channel直到其关闭
ch.closeFuture().sync();
} finally {
// 优雅关闭EventLoopGroup
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
添加处理器:
public class HttpHelloWorldServerInitializer extends ChannelInitializer<SocketChannel> {
private final SslContext sslCtx;
public HttpHelloWorldServerInitializer(SslContext sslCtx) {
this.sslCtx = sslCtx;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new HttpServerCodec());
p.addLast(new HttpContentCompressor((CompressionOptions[]) null));
p.addLast(new HttpServerExpectContinueHandler());
p.addLast(new HttpHelloWorldServerHandler());
}
}
自定义处理器:
public class HttpHelloWorldServerHandler extends SimpleChannelInboundHandler<HttpObject> {
private static final byte[] CONTENT = { 'H', 'e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', 'l', 'd' };
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
if (msg instanceof HttpRequest) {
HttpRequest req = (HttpRequest) msg;
boolean keepAlive = HttpUtil.isKeepAlive(req);
FullHttpResponse response = new DefaultFullHttpResponse(req.protocolVersion(), OK,
Unpooled.wrappedBuffer(CONTENT));
response.headers()
.set(CONTENT_TYPE, TEXT_PLAIN)
.setInt(CONTENT_LENGTH, response.content().readableBytes());
if (keepAlive) {
if (!req.protocolVersion().isKeepAliveDefault()) {
response.headers().set(CONNECTION, KEEP_ALIVE);
}
} else {
// Tell the client we're going to close the connection.
response.headers().set(CONNECTION, CLOSE);
}
ChannelFuture f = ctx.write(response);
if (!keepAlive) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
运行:启动服务端入口代码;浏览器访问http://127.0.0.1:8081/,成功返回Hello World。
简单介绍下服务器入口程序的启动过程:
1、创建引导类ServerBootstrap
2、设置NioEventLoopGroup线程池(boss处理连接事件、worker处理读写等其它IO事件)
3、配置非阻塞的服务端传输管道NioServerSocketChannel
4、创建ChannelPipeline及添加ChannelHandler,处理数据
5、启动并绑定监听端口
组件
示例中出现的的组件有:EventLoopGroup、Channel、ChannelHandler、ChannelPipeline、ChannelFuture、ServerBootstrap、HttpServerCodec;以及HttpServerCodec用到ByteBuf,下面一一进行介绍。
Channel
这个Channel不是Java NIO的Channel,它是Java NIO的Channel的高级封装,提供了更多网络通讯和数据处理的功能,如关联ChannelPipeline等。它表示一个到实体(如一个硬件设备、一个文件、一个网络套接字)的开放连接,如读操作或写操作。
Netty中,可以把Channel看作是入站和出站数据的载体。
EventLoopGroup
如图,EventLoopGroup继承于JDK的线程池。EventLoopGroup负责管理一组线程,每个线程由子类EventLoop来代表,这些线程用于处理网络事件和执行任务。每个EventLoop对应一个特定的Thread线程,一旦线程被分配便不会改动。
EventLoopGroup与EventLoop的关系可以类比JDK中线程池与线程的关系。
如图,
所有的EventLoop由其所属的EventLoopGroup分配,所以EventLoop有parent()可以找到分配它的EventLoopGroup。
EventLoop对应于一个特定的线程,EventLoop处理分配给它的所有Channel的事件和任务。
EventLoopGroup为每个新创建的channel分配一个EventLoop,在整个Channel的生命周期内,所有的操作都由其执行。
关于EventLoop多介绍一点,它不仅支持任务的立即执行,同时支持时间调度执行,这里的任务可以是实现了Runnable或Callable接口的任务。
ChannelHandler
ChannelHandler作为事件处理器,在Netty中至关重要,IO的事件处理、数据编解码器、业务逻辑处理都要依赖于ChannelHandler的实现。
ChannelHandler有两个重要的子接口:
ChannelInboundHandler:处理入站数据以及状态的变化
ChannelInboundHandler的生命周期受到Channel生命周期的变化而变化,如新的Channel被创建、Channel上有读事件等。
当某个ChannelInboundHandler的实现重写了channelRead方法并且处理完了ByteBuf,需要负责释放池化的ByteBuf引用计数;如果是实现了SimpleChannelInboundHandler的channelRead0方法,则会自动释放,由其默认构造函数决定。
ChannelOutboundHandler:处理出战数据以及状态的变化
ChannelOutboundHandler的一个强大功能是按需推迟操作和事件。
ChannelHandler有两个适配器了,已经实现了一些基本的功能,可以进行实现自定义自己的ChannelHandler。整体的类继承关系图如下所示:
ChannelPipeline
上面介绍了ChannelHandler,每个ChannelHandler实现都对应着某一个能力,如数据编解码能力、数据压缩能力、安全加密能力等等。在一个IO事件处理的整个流程中,一般都会由几个ChannelHandler一起完成流程处理,ChannelHandler的拼接就是由ChannelPipeline完成的。
每个新创建的Channel都会关联一个唯一且永久的ChannelPipeline。根据事件的起源(外部传入、内部传出),事件经由ChannelPipeline的ChannelInboundHandler或ChannelOutboundHandler依次处理。
关于ChannelPipeline中ChannelInboundHandler与ChannelOutboundHandler的执行顺序,先了解下面的组件关系:
图中包括了Channel、ChannelHandler(处理器)、ChannelPipeline、ChannelHandlerContext,关系如下:
一个Channel关联一个ChannelPipeline
一个ChannelPipeline包括一些列的ChannelHandler(Inbound和Outbound)
在ChannelPipeline添加ChannelHandler时,会创建ChannelHandler对应的ChannelHandlerContext,并以双链表方式相互关联前后ChannelHandlerContext
如果自定义的ChannelHandler共享于多个ChannelPipeline,那么需要使用@Shareable标记自定义的ChannelHandler
ChannelHandlerContext内部可以关联到Channel、ChannelPipeline、ChannelHandler。
再看下有关触发事件,ChannelPipeline和ChannelHandlerContext都有相关Api触发调用下一个ChannelHandler:
以fireChannel*开头的方法,调用“下一个”ChannelInboundHandler;
以非fireChannel*开头(如调用bind、connect、disconnect、write、writeAndFlush等),调用“下一个”ChannelOutboundHandler;
注意下一个不是后一个,也可能是前一个,具体要看是in还是out
关于执行顺序:
如果是ChannelHandlerContext调用,则从当前的ChannelHandlerContext找“下一个”
如果是ChannelPipeline(有些方法Channel也可以调用,内部其实是pipeline)调用,则从端点(头或尾)调用ChannelHandlerContext
如果是触发“下一个”ChannelInboundHandler,则从前往后找
如果是触发”下一个“ChannelOutboundHandler,则从后往前找
顺序举例:
ch.pipeline().addLast(new InboundHandler1());
ch.pipeline().addLast(new InboundHandler2());
ch.pipeline().addLast(new OutboundHandler1());
ch.pipeline().addLast(new OutboundHandler2());
1、如果在InboundHandler2执行channelHandlerContext.writeAndFlush(),则不会调用Outbound
2、如果在InboundHandler2执行channelPipeline.writeAndFlush(),则会依次调用Outbound2 -> Outbound1
ChannelFuture
Netty是异步和事件驱动的。JDK内置的Future使用时,需要手动检查或者一直get阻塞直到任务完成,所以Netty扩展了JDK的Future,实现了ChannelFuture。通过注册ChannelFutureListener,当对应操作完成时,进行回调通知,典型的事件驱动模型。
例如下面代码,异步建立连接:
Channel channel = ...;
// 不阻塞
ChannelFuture future = channel.connect(new InetSocketAddress("127.0.0.1", "8080"));
ServerBootstrap
ServerBootstrap是作为一种服务端的引导类,用于完成应用程序的配置与启动,包括但不限于EventLoopGroup、Channel、ChannelHandler。
对应客户端的引导类是Bootstrap;不管客户端还是服务端它们都继承了AbstractBootstrap。
HttpServerCodec
HttpServerCodec是Netty实现应对Http协议的编解码器,它包括了ByteToMessageDecoder和MessageToMessageEncoder
编码器:操作出站数据,将消息编码成字节或另一种消息,主要抽象类MessageToByteEncoder、MessageToMessageEncoder;
解码器:操作入站数据,将字节或一种消息解码成另一种消息,主要抽象类ByteToMessageDecoder、MessageToMessageDecoder
得益于Netty的ChannelPipeline设计,编解码器涉及出入站数据处理,编码器继承于ChannelOutboundHandlerAdapter,解码器继承于ChannelInboundHandlerAdapter。
Netty应对各种场景的内置编解码器,如:
1、用于Http协议场景的客户端使用的编码器 HttpRequestEncoder、解码器 HttpResponseDecoder
2、用于Http协议场景的聚合Http消息的 HttpObjectAggregator
3、用于WebSocket场景心跳检测的 IdleStateHandler
4、基于分隔符的协议的解码器 DelimiterBasedFrameDecoder、LineBaseFrameDecoder
5、基于长度的协议的解码器 FixedLengthFrameDecoder、LengthFieldBasedFrameDecoder
6、Google的Protocol Buffers的编解码器 ProtobufDncoder、ProtocolEncoder
ByteBuf
Netty使用ByteBuf代替JDK中的ByteBuffer,提供了更灵活和强大的数据处理容器。Netty两类数据处理组件包括ByteBuf和ByteBufHolder。
ByteBuf维护了两个索引,一个是读取索引readerIndex,一个是写数索引writeIndex,这样就不用像JDK中的ByteBuffer一样读写模式转换需要调用flip()方法。
起始情况下,readerIndex和writeIndex都为0;在使用以read*开头的方法读取数据时,移动readerIndex;在使用write*开头的方法写数据时,移动writerIndex;但是readerIndex不能超过writeIndex。
某时刻的ByteBuf状态:
如同discardable byte区域:表示可废弃的区域,若调用discardReadBytes()方法后,变成如下所示:
ByteBuf的使用模式包括四种:
1、堆缓冲区(Heap Buffer):
数据存在在JVM堆中,可以快速分配且不使用时可以被GC回收。示例:
ByteBuf heapBuf = Unpooled.buffer(10); // 分配一个新的堆缓冲区
if (heapBuf.hasArray()) {
byte[] array = heapBuf.array();
int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();
int length = heapBuf.readableBytes();
// 使用数组、偏移量和长度作为数据的处理
}
2、直接缓冲区(Direct Buffer):
数据存储在操作系统的内存中,适用于长时间存储的大型数据,如网络 I/O 的数据传输。
直接缓冲区可以减少在 JVM 堆和操作系统内存之间复制数据的次数,提高性能。
ByteBuf directBuf = Unpooled.directBuffer(10); // 分配一个新的直接缓冲区
if (!directBuf.hasArray()) {
int length = directBuf.readableBytes();
byte[] array = new byte[length];
directBuf.getBytes(directBuf.readerIndex(), array);
// 使用字节数组处理数据
}
3、复合缓冲区(Composite Buffer):
可以将多个不同的 ByteBuf 合并为一个逻辑单元,避免了拷贝数据的性能损耗。
适用于需要将多个数据片段合并在一起发送的场景。
CompositeByteBuf compBuf = Unpooled.compositeBuffer();
ByteBuf heapBuf = Unpooled.buffer(10);
ByteBuf directBuf = Unpooled.directBuffer(10);
compBuf.addComponents(heapBuf, directBuf);
// 遍历复合缓冲区中的所有组件
for (int i = 0; i < compBuf.numComponents(); i++) {
ByteBuf b = compBuf.component(i);
// 处理每个部分的数据
}
4、切片(Slicing)和复制(Duplicating):
切片和复制允许创建一个或多个新的 ByteBuf 实例,它们共享同一个数据存储,但拥有独立的索引和标记。
这对于只读数据或分段处理非常有用。
ByteBuf buffer = Unpooled.buffer(10);
// 写入一些数据到buffer中
ByteBuf sliced = buffer.slice(3, 5); // 创建一个从索引3开始,长度为5的切片
// sliced 和 buffer 共享相同的存储空间,但拥有独立的读写索引
ByteBuf的分配方式包括两种:
1、按需分配(ByteBufAllocator)
Netty提供了内存池(pooled)技术,用于重用ByteBuf,该方式可以减少内存分配和垃圾回收的开销。使用时通过对应的Channel或ChannelHandlerContext获取ByteBufAllocator引用,完成分配。
2、非池化分配(Unpooled)
在未获得ByteBufAllocator引用的情况下,可以使用Unpooled的静态方法完成非池化ByteBuf的创建。
使用池化内存时注意,必须在使用完进行正确的释放,以免出现内存泄漏问题(开发期间可以开启内存泄漏检测机制),释放的方式如果是ByteBuf调用release(),则引用计数减一;如果是使用ReferenceCounted的release()方法,则不管当前值,直接置0。
总结
Netty中用的组件包括Channel、EventLoopGroup、ChannelHandler、ChannelPipeline、ChannelFuture、BootStrap、ByteBuf、编解码器等。每一个都在其中发挥着重要作用。
Channel作为一个到网络的连接,连接着数据的读写操作;
EventLoopGroup作为线程模型的一部分,对创建的Channel进行EventLoop的分配;
ChannelHandler核心接口,业务逻辑、数据处理都离不开它;
ChannelPipeline组织多个ChannelHandler,完成数据与业务流的处理;
ChannelFuture作为异步和事件通知的核心组件;
BootStrap应用程序启动的引导类,分为了服务端和客户端的引导,所谓引导就是设置和组织其它组件;
ByteBuf数据处理的容器,用来装数据的;
编解码器的存在也是Netty的强大之处,Netty内置的编解码器极大的提高了网络程序开发的便捷。