前言

本篇会通过一个示例引入Netty的相关组件,以及这些组件的关系,这也是Netty的核心,任何功能的扩展都是基于某个或某些组件。

一个例子

例子是实现一个服务端的http服务,其来源于github Netty example模块,代码地址>。

服务端启动入口:

/**
 * An HTTP server that sends back the content of the received HTTP request
 * in a pretty plaintext form.
 */
public final class HttpHelloWorldServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8081"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx = ServerUtil.buildSslContext();

        // Configure the server.
        // 创建两个EventLoopGroup,作为两个独立的线程池
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 服务端引导类ServerBootstrap,对所有组件进行配置使其进行启动
            ServerBootstrap b = new ServerBootstrap();
            b.option(ChannelOption.SO_BACKLOG, 1024);
            // 配置线程池
            b.group(bossGroup, workerGroup)
                    // 配置非阻塞的服务端传输Channel
             .channel(NioServerSocketChannel.class)
                    // 处理连接io和其它事件io的处理器
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new HttpHelloWorldServerInitializer(sslCtx));

            // 异步绑定端口,调用sync()直到绑定成功。
            Channel ch = b.bind(PORT).sync().channel();

            System.err.println("Open your web browser and navigate to " +
                    (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');

            // 阻塞Channel直到其关闭
            ch.closeFuture().sync();
        } finally {
            // 优雅关闭EventLoopGroup
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

添加处理器:

public class HttpHelloWorldServerInitializer extends ChannelInitializer<SocketChannel> {

    private final SslContext sslCtx;

    public HttpHelloWorldServerInitializer(SslContext sslCtx) {
        this.sslCtx = sslCtx;
    }

    @Override
    public void initChannel(SocketChannel ch) {
        ChannelPipeline p = ch.pipeline();
        if (sslCtx != null) {
            p.addLast(sslCtx.newHandler(ch.alloc()));
        }
        p.addLast(new HttpServerCodec());
        p.addLast(new HttpContentCompressor((CompressionOptions[]) null));
        p.addLast(new HttpServerExpectContinueHandler());
        p.addLast(new HttpHelloWorldServerHandler());
    }
}

自定义处理器:

public class HttpHelloWorldServerHandler extends SimpleChannelInboundHandler<HttpObject> {
    private static final byte[] CONTENT = { 'H', 'e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', 'l', 'd' };

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
        if (msg instanceof HttpRequest) {
            HttpRequest req = (HttpRequest) msg;

            boolean keepAlive = HttpUtil.isKeepAlive(req);
            FullHttpResponse response = new DefaultFullHttpResponse(req.protocolVersion(), OK,
                                                                    Unpooled.wrappedBuffer(CONTENT));
            response.headers()
                    .set(CONTENT_TYPE, TEXT_PLAIN)
                    .setInt(CONTENT_LENGTH, response.content().readableBytes());

            if (keepAlive) {
                if (!req.protocolVersion().isKeepAliveDefault()) {
                    response.headers().set(CONNECTION, KEEP_ALIVE);
                }
            } else {
                // Tell the client we're going to close the connection.
                response.headers().set(CONNECTION, CLOSE);
            }

            ChannelFuture f = ctx.write(response);

            if (!keepAlive) {
                f.addListener(ChannelFutureListener.CLOSE);
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

运行:启动服务端入口代码;浏览器访问http://127.0.0.1:8081/,成功返回Hello World。

简单介绍下服务器入口程序的启动过程

1、创建引导类ServerBootstrap

2、设置NioEventLoopGroup线程池(boss处理连接事件、worker处理读写等其它IO事件)

3、配置非阻塞的服务端传输管道NioServerSocketChannel

4、创建ChannelPipeline及添加ChannelHandler,处理数据

5、启动并绑定监听端口

组件

示例中出现的的组件有:EventLoopGroup、Channel、ChannelHandler、ChannelPipeline、ChannelFuture、ServerBootstrap、HttpServerCodec;以及HttpServerCodec用到ByteBuf,下面一一进行介绍。

Channel

这个Channel不是Java NIO的Channel,它是Java NIO的Channel的高级封装,提供了更多网络通讯和数据处理的功能,如关联ChannelPipeline等。它表示一个到实体(如一个硬件设备、一个文件、一个网络套接字)的开放连接,如读操作或写操作。

Netty中,可以把Channel看作是入站和出站数据的载体。

EventLoopGroup

如图,EventLoopGroup继承于JDK的线程池。EventLoopGroup负责管理一组线程,每个线程由子类EventLoop来代表,这些线程用于处理网络事件和执行任务。每个EventLoop对应一个特定的Thread线程,一旦线程被分配便不会改动

EventLoopGroup与EventLoop的关系可以类比JDK中线程池与线程的关系。

如图,

  1. 所有的EventLoop由其所属的EventLoopGroup分配,所以EventLoop有parent()可以找到分配它的EventLoopGroup。

  2. EventLoop对应于一个特定的线程,EventLoop处理分配给它的所有Channel的事件和任务。

  3. EventLoopGroup为每个新创建的channel分配一个EventLoop,在整个Channel的生命周期内,所有的操作都由其执行。

关于EventLoop多介绍一点,它不仅支持任务的立即执行,同时支持时间调度执行,这里的任务可以是实现了Runnable或Callable接口的任务。

ChannelHandler

ChannelHandler作为事件处理器,在Netty中至关重要,IO的事件处理、数据编解码器、业务逻辑处理都要依赖于ChannelHandler的实现。

ChannelHandler有两个重要的子接口:

  • ChannelInboundHandler处理入站数据以及状态的变化

ChannelInboundHandler的生命周期受到Channel生命周期的变化而变化,如新的Channel被创建、Channel上有读事件等。

当某个ChannelInboundHandler的实现重写了channelRead方法并且处理完了ByteBuf,需要负责释放池化的ByteBuf引用计数;如果是实现了SimpleChannelInboundHandler的channelRead0方法,则会自动释放,由其默认构造函数决定。

  • ChannelOutboundHandler处理出战数据以及状态的变化

ChannelOutboundHandler的一个强大功能是按需推迟操作和事件。

ChannelHandler有两个适配器了,已经实现了一些基本的功能,可以进行实现自定义自己的ChannelHandler。整体的类继承关系图如下所示:

ChannelPipeline

上面介绍了ChannelHandler,每个ChannelHandler实现都对应着某一个能力,如数据编解码能力、数据压缩能力、安全加密能力等等。在一个IO事件处理的整个流程中,一般都会由几个ChannelHandler一起完成流程处理,ChannelHandler的拼接就是由ChannelPipeline完成的

每个新创建的Channel都会关联一个唯一且永久的ChannelPipeline。根据事件的起源(外部传入、内部传出),事件经由ChannelPipeline的ChannelInboundHandler或ChannelOutboundHandler依次处理。

关于ChannelPipeline中ChannelInboundHandler与ChannelOutboundHandler的执行顺序,先了解下面的组件关系:

图中包括了Channel、ChannelHandler(处理器)、ChannelPipeline、ChannelHandlerContext,关系如下

  • 一个Channel关联一个ChannelPipeline

  • 一个ChannelPipeline包括一些列的ChannelHandler(Inbound和Outbound)

  • 在ChannelPipeline添加ChannelHandler时,会创建ChannelHandler对应的ChannelHandlerContext,并以双链表方式相互关联前后ChannelHandlerContext

  • 如果自定义的ChannelHandler共享于多个ChannelPipeline,那么需要使用@Shareable标记自定义的ChannelHandler

ChannelHandlerContext内部可以关联到Channel、ChannelPipeline、ChannelHandler。

再看下有关触发事件,ChannelPipeline和ChannelHandlerContext都有相关Api触发调用下一个ChannelHandler:

  1. 以fireChannel*开头的方法,调用“下一个”ChannelInboundHandler;

  2. 以非fireChannel*开头(如调用bind、connect、disconnect、write、writeAndFlush等),调用“下一个”ChannelOutboundHandler;

注意下一个不是后一个,也可能是前一个,具体要看是in还是out

关于执行顺序

  • 如果是ChannelHandlerContext调用,则从当前的ChannelHandlerContext找“下一个”

  • 如果是ChannelPipeline(有些方法Channel也可以调用,内部其实是pipeline)调用,则从端点(头或尾)调用ChannelHandlerContext

  • 如果是触发“下一个”ChannelInboundHandler,则从前往后找

  • 如果是触发”下一个“ChannelOutboundHandler,则从后往前找

顺序举例:

ch.pipeline().addLast(new InboundHandler1());

ch.pipeline().addLast(new InboundHandler2());

ch.pipeline().addLast(new OutboundHandler1());

ch.pipeline().addLast(new OutboundHandler2());

1、如果在InboundHandler2执行channelHandlerContext.writeAndFlush(),则不会调用Outbound

2、如果在InboundHandler2执行channelPipeline.writeAndFlush(),则会依次调用Outbound2 -> Outbound1

ChannelFuture

Netty是异步和事件驱动的。JDK内置的Future使用时,需要手动检查或者一直get阻塞直到任务完成,所以Netty扩展了JDK的Future,实现了ChannelFuture。通过注册ChannelFutureListener,当对应操作完成时,进行回调通知,典型的事件驱动模型。

例如下面代码,异步建立连接:

Channel channel = ...;
// 不阻塞
ChannelFuture future = channel.connect(new InetSocketAddress("127.0.0.1", "8080"));

ServerBootstrap

ServerBootstrap是作为一种服务端的引导类,用于完成应用程序的配置与启动,包括但不限于EventLoopGroup、Channel、ChannelHandler。

对应客户端的引导类是Bootstrap;不管客户端还是服务端它们都继承了AbstractBootstrap。

HttpServerCodec

HttpServerCodec是Netty实现应对Http协议的编解码器,它包括了ByteToMessageDecoder和MessageToMessageEncoder

  • 编码器:操作出站数据,将消息编码成字节或另一种消息,主要抽象类MessageToByteEncoder、MessageToMessageEncoder;

  • 解码器:操作入站数据,将字节或一种消息解码成另一种消息,主要抽象类ByteToMessageDecoder、MessageToMessageDecoder

得益于Netty的ChannelPipeline设计,编解码器涉及出入站数据处理,编码器继承于ChannelOutboundHandlerAdapter,解码器继承于ChannelInboundHandlerAdapter。

Netty应对各种场景的内置编解码器,如:

1、用于Http协议场景的客户端使用的编码器 HttpRequestEncoder、解码器 HttpResponseDecoder

2、用于Http协议场景的聚合Http消息的 HttpObjectAggregator

3、用于WebSocket场景心跳检测的 IdleStateHandler

4、基于分隔符的协议的解码器 DelimiterBasedFrameDecoder、LineBaseFrameDecoder

5、基于长度的协议的解码器 FixedLengthFrameDecoder、LengthFieldBasedFrameDecoder

6、Google的Protocol Buffers的编解码器 ProtobufDncoder、ProtocolEncoder

ByteBuf

Netty使用ByteBuf代替JDK中的ByteBuffer,提供了更灵活和强大的数据处理容器。Netty两类数据处理组件包括ByteBuf和ByteBufHolder。

ByteBuf维护了两个索引,一个是读取索引readerIndex,一个是写数索引writeIndex,这样就不用像JDK中的ByteBuffer一样读写模式转换需要调用flip()方法。

起始情况下,readerIndex和writeIndex都为0;在使用以read*开头的方法读取数据时,移动readerIndex;在使用write*开头的方法写数据时,移动writerIndex;但是readerIndex不能超过writeIndex

某时刻的ByteBuf状态:

如同discardable byte区域:表示可废弃的区域,若调用discardReadBytes()方法后,变成如下所示:

ByteBuf的使用模式包括四种:

1、堆缓冲区(Heap Buffer):

数据存在在JVM堆中,可以快速分配且不使用时可以被GC回收。示例:

ByteBuf heapBuf = Unpooled.buffer(10); // 分配一个新的堆缓冲区
if (heapBuf.hasArray()) {
    byte[] array = heapBuf.array();
    int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();
    int length = heapBuf.readableBytes();
    // 使用数组、偏移量和长度作为数据的处理
}

2、直接缓冲区(Direct Buffer):

数据存储在操作系统的内存中,适用于长时间存储的大型数据,如网络 I/O 的数据传输。

直接缓冲区可以减少在 JVM 堆和操作系统内存之间复制数据的次数,提高性能。

ByteBuf directBuf = Unpooled.directBuffer(10); // 分配一个新的直接缓冲区
if (!directBuf.hasArray()) {
    int length = directBuf.readableBytes();
    byte[] array = new byte[length];
    directBuf.getBytes(directBuf.readerIndex(), array);
    // 使用字节数组处理数据
}

3、复合缓冲区(Composite Buffer):

可以将多个不同的 ByteBuf 合并为一个逻辑单元,避免了拷贝数据的性能损耗。

适用于需要将多个数据片段合并在一起发送的场景。

CompositeByteBuf compBuf = Unpooled.compositeBuffer();
ByteBuf heapBuf = Unpooled.buffer(10);
ByteBuf directBuf = Unpooled.directBuffer(10);
compBuf.addComponents(heapBuf, directBuf);
// 遍历复合缓冲区中的所有组件
for (int i = 0; i < compBuf.numComponents(); i++) {
    ByteBuf b = compBuf.component(i);
    // 处理每个部分的数据
}

4、切片(Slicing)和复制(Duplicating):

切片和复制允许创建一个或多个新的 ByteBuf 实例,它们共享同一个数据存储,但拥有独立的索引和标记。

这对于只读数据或分段处理非常有用。

ByteBuf buffer = Unpooled.buffer(10);
// 写入一些数据到buffer中
ByteBuf sliced = buffer.slice(3, 5); // 创建一个从索引3开始,长度为5的切片
// sliced 和 buffer 共享相同的存储空间,但拥有独立的读写索引

ByteBuf的分配方式包括两种:

1、按需分配(ByteBufAllocator)

Netty提供了内存池(pooled)技术,用于重用ByteBuf,该方式可以减少内存分配和垃圾回收的开销。使用时通过对应的Channel或ChannelHandlerContext获取ByteBufAllocator引用,完成分配。

2、非池化分配(Unpooled)

在未获得ByteBufAllocator引用的情况下,可以使用Unpooled的静态方法完成非池化ByteBuf的创建。

使用池化内存时注意,必须在使用完进行正确的释放,以免出现内存泄漏问题(开发期间可以开启内存泄漏检测机制),释放的方式如果是ByteBuf调用release(),则引用计数减一;如果是使用ReferenceCounted的release()方法,则不管当前值,直接置0。

总结

Netty中用的组件包括Channel、EventLoopGroup、ChannelHandler、ChannelPipeline、ChannelFuture、BootStrap、ByteBuf、编解码器等。每一个都在其中发挥着重要作用。

Channel作为一个到网络的连接,连接着数据的读写操作;

EventLoopGroup作为线程模型的一部分,对创建的Channel进行EventLoop的分配;

ChannelHandler核心接口,业务逻辑、数据处理都离不开它;

ChannelPipeline组织多个ChannelHandler,完成数据与业务流的处理;

ChannelFuture作为异步和事件通知的核心组件;

BootStrap应用程序启动的引导类,分为了服务端和客户端的引导,所谓引导就是设置和组织其它组件;

ByteBuf数据处理的容器,用来装数据的;

编解码器的存在也是Netty的强大之处,Netty内置的编解码器极大的提高了网络程序开发的便捷。