前言

Netty作为一款异步事件驱动的网络编程框架,具有高性能和易于开发的API。所以它在很多优秀的框架中都被选为网络通讯的组件,如ES、Dubbo等。本文基于Netty来实现一个简易版的PRC框架;一方面对Netty的实践做个记录,另一方面引出一款RPC框架都需要哪些基本的东西。

RPC简介

RPC(Remote Procedure Call)叫远程过程调用,它实现了一种服务间通讯的方式,简单说就是调用远程服务就像调用本地方法一样。所谓远程服务可以理解成两个进程的另一个;就像调用本地方法一样,是对于调用方,其不同关心调用远程和通讯的细节。

有了RPC框架,服务间调用,如下所示:

一些主流的框架如Apache托管的Dubbo和Thrift、Google的gPRC都是非常优秀的RPC框架,那么具体RPC框架需要哪些东西

  1. 数据在网络上传输就需要序列化,那么就需要序列化(对象到字节)或反序列化(字节到对象)技术,对应着不同的数据格式,如XStream的xml序列化、Fastjson的json序列化、还有Hessian、Protobuf等;

  2. 客户端需要发起调用,还需要动态代理,通过代理类来屏蔽调用细节,如Cglib、JDK的动态代理等;

  3. 通讯协议,通讯就涉及到双方需要约定协议,就像TCP、HTTP等属于通讯协议。协议定义了相关数据的格式和标准,双方按照统一的规范定义数据进而完成数据的处理,这也是协议的本质。

  4. 最后实现通讯能力,通过今天的核心内容Netty实现;

有了上面这些内容,我们再来看看一个简易的RPC框架如何实现调用

RPC与HTTP有什么区别呢

  • 协议层面:PRC是一种通讯抽象,不是协议,它可以基于HTTP实现通讯;而HTTP是一种应用层协议;

  • 场景上;RPC常用于微服务架构下服务间的通讯;HTTP常用于浏览器与服务器通讯;

  • 数据格式上:RPC支持Json、XML、二进制等;HTTP主要是JSON、XML等;

基于Netty的RPC实现

项目初始化

创建项目netty-rpc,对应完整代码github地址>

项目结果如下:

--netty-rpc

----netty-rpc-client:客户端模块

----netty-rpc-com:公共模块

----netty-rpc-server:服务器模块

下面示例若非客户端和服务器,则均实现在com模块中。

netty-rpc最外层pom配置:

	<properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <fastjson2.version>2.0.23</fastjson2.version>
        <lombok.version>1.18.24</lombok.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.alibaba.fastjson2</groupId>
                <artifactId>fastjson2</artifactId>
                <version>${fastjson2.version}</version>
            </dependency>
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.106.Final</version> <!-- 使用最新的Netty版本 -->
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <scope>compile</scope>
                <version>${lombok.version}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <!-- 实际依赖 -->
    <dependencies>
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>compile</scope>
        </dependency>
    </dependencies>

netty-rpc-client与netty-rpc-server对应pom:

    <dependencies>
        <!-- 公共common -->
        <dependency>
            <groupId>top.xudj</groupId>
            <artifactId>netty-rpc-com</artifactId>
            <version>${parent.version}</version>
        </dependency>
    </dependencies>

定义序列化实现

使用Fastjson完成序列化和反序列化,序列化接口ISerializer和Json序列化实现类JsonSerializer。

/**
 * 序列化接口
 */
public interface ISerializer {

    /**
     * 序列化
     * @param obj
     * @return
     */
    <T> byte[] serialize(T obj);

    /**
     * 反序列化
     * @param data
     * @param clazz
     * @return
     */
    <T> T deserialize(byte[] data, Class<T> clazz);

}

/**
 * 使用Json序列化
 */
public class JsonSerializer implements ISerializer {

    private JSONReader.Feature[] features = {JSONReader.Feature.SupportClassForName};

    @Override
    public <T> byte[] serialize(T obj) {
        return JSON.toJSONBytes(obj);
    }

    @Override
    public <T> T deserialize(byte[] data, Class<T> clazz) {
        return JSON.parseObject(data, clazz, features);
    }
}

定义协议标准

一般定义协议需要定义数据格式、消息序列化方式、错误检测、安全性、兼容性等等,其实都是在定义数据内容,不同的内容表示不同的作用。

本文通过协议来统一数据格式,什么字段表示什么含义等。

/**
 * 协议(请求内容)
 * prc request
 */
@Getter
@Setter
@ToString
public class RpcRequest implements Serializable {
    private static final long serialVersionUID = 1L;
    // 为下面变量添加注释

    /**
     * 请求id
     */
    private String requestId;

    /**
     * 类名
     */
    private String className;

    /**
     * 方法名
     */
    private String methodName;

    /**
     * 参数类型
     */
    private Class<?>[] parameterTypes;

    /**
     * 参数
     */
    private Object[] parameters;


}

/**
 * 协议(响应内容)
 * prc response
 */
@Getter
@Setter
@ToString
public class RpcResponse {

    /**
     * 响应id
     */
    private String requestId;

    /**
     * 异常
     */
    private String error;

    /**
     * 返回的结果
     */
    private Object result;

}

定义编解码器

使用协议标准和序列化方式完成数据与字节码的互转;同时定义如何区分一条数据的完整性,一般有分隔符数据长度标识,分隔符包括有换行符或者自定义符合;数据长度标识可以在消息的最开头定义消息的长度或者协议定义使用固定长度。

本文通过自定义编解码器使用长度字段来定义。

/**
 * rpc 编码器 使用netty的组件
 * @see io.netty.handler.codec.MessageToByteEncoder
 */
public class RpcEncoder extends MessageToByteEncoder {
    /**
     * 自定义的编码器
     */
    private ISerializer serializer;

    public RpcEncoder(ISerializer serializer) {
        this.serializer = serializer;
    }

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object t, ByteBuf byteBuf) throws Exception {
        // 序列化
        byte[] bytes = serializer.serialize(t);
        // 写入长度
        byteBuf.writeInt(bytes.length);
        // 写入内容
        byteBuf.writeBytes(bytes);
    }

}

/**
 * rpc 解码器 使用netty的组件
 * @see io.netty.handler.codec.ByteToMessageDecoder
 */
public class RpcDecoder extends ByteToMessageDecoder {

    // 注意,需要定义class,完成数据的解析成指定类型,不然ServerHandler会因为无法匹配类型无法执行
    private Class clazz;
    private ISerializer serializer;

    public RpcDecoder(Class clazz, ISerializer serializer) {
        this.clazz = clazz;
        this.serializer = serializer;
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        // 读取长度,首先要大于写入的int长度(4字节)
        if (byteBuf.readableBytes() < 4) {
            System.out.println("RpcDecoder.decode: byteBuf.readableBytes() < 4");
            return;
        }
        // 标记一下当前的readIndex的位置
        byteBuf.markReaderIndex();
        // 读取长度
        int dataLength = byteBuf.readInt();
        // 长度如果小于0,关闭连接
        if (dataLength <= 0) {
            System.out.println("RpcDecoder.decode: dataLength < 0");
            channelHandlerContext.close();
            return;
        }

        // 如果可读长度小于内容长度,resetReaderIndex
        if (byteBuf.readableBytes() < dataLength) {
            System.out.println("RpcDecoder.decode: byteBuf.readableBytes() < dataLength");
            byteBuf.resetReaderIndex();
            return;
        }

        // 读取内容
        byte[] data = new byte[dataLength];
        byteBuf.readBytes(data);
        // 反序列化
        Object obj = serializer.deserialize(data, clazz);
        list.add(obj);
    }
}

定义Api接口

定义一个简单的hello接口;另外IService作为一个标识方便模拟服务器实现类的查询。

/**
 * 仅仅作为一个声明
 */
public interface IService {
}
/**
 * 本地接口
 */
public interface HelloService extends IService {

    String hello(String name);

}

实现客户端

客户端定义包括代理、建立连接进行通讯、发送请求与响应

  • 定义代理

/**
 * RPC客户端动态代理
 */
public class RpcClientDynamicProxy implements InvocationHandler {

    private ClientNetty client;
    public RpcClientDynamicProxy(ClientNetty client) {
        this.client = client;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcRequest request = new RpcRequest();
        String requestId = UUID.randomUUID().toString();

        String className = method.getDeclaringClass().getName();
        String methodName = method.getName();
        // 检查是否是Object类的方法
        // 为了防止在调试情况下,频繁调用toString方法,导致非预期rpc调用
        if (method.getDeclaringClass() == Object.class) {
            return handleObjectMethodInvoke(proxy, methodName, args);
        }
        Class<?>[] parameterTypes = method.getParameterTypes();

        request.setRequestId(requestId);
        request.setClassName(className);
        request.setMethodName(methodName);
        request.setParameterTypes(parameterTypes);
        request.setParameters(args);
        System.out.println("request: " + request);

        // 发送请求
        RpcResponse rpcResponse = client.send(request);
        System.out.println("rpcResponse: " + rpcResponse);

        return Optional.ofNullable(rpcResponse).map(r -> r.getResult()).orElse(null);
    }


    /**
     * 处理Object类的方法
     *
     * @param proxy: 代理对象
     * @param methodName: 方法名
     * @param args: 参数
     * @return
     */
    private Object handleObjectMethodInvoke(Object proxy, String methodName, Object[] args) {
        switch (methodName) {
            case "toString":
                return proxy.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(proxy));
            case "hashCode":
                return System.identityHashCode(proxy);
            case "equals":
                return proxy == args[0];
        }
        throw new UnsupportedOperationException(methodName);
    }

}

/**
 * 客户端的代理工厂
 */
public class ClientProxyFactory {

    /**
     * 创建代理对象
     *
     * @param interfaceClass: 接口类型
     * @param client: 客户端
     * @param <T>
     * @return
     */
    public static <T> T create(Class<T> interfaceClass, ClientNetty client) {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
                new Class[]{interfaceClass},
                new RpcClientDynamicProxy(client));
    }

}

  • 基于Netty的通讯

/**
 * 客户端
 */
public class ClientNetty {

    // 事件循环组
    private EventLoopGroup eventLoopGroup;
    // 处理器
    private ClientHandler clientHandler;
    // 通道
    private Channel channel;

    /**
     * 连接服务端
     * @param host: 服务端地址
     * @param port: 服务端端口
     */
    public void connect(String host, int port) {
        eventLoopGroup = new NioEventLoopGroup();
        clientHandler = new ClientHandler();
        // 启动类
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup);
        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.option(ChannelOption.TCP_NODELAY, true);
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel channel) throws Exception {
                // in顺序: RpcDecoder -> clientHandler
                // out顺序: clientHandler -> RpcEncoder
                ChannelPipeline pipeline = channel.pipeline();
                // 添加编解码器 RpcEncoder未指定范型
                pipeline.addLast(new RpcEncoder(new JsonSerializer()));
                pipeline.addLast(new RpcDecoder(RpcResponse.class, new JsonSerializer()));

                // 添加自定义的处理器
                pipeline.addLast(clientHandler);
            }
        });


        try {
            // 等待连接成功
            ChannelFuture connectFuture = bootstrap.connect(new InetSocketAddress(host, port))
                    .addListener((ChannelFutureListener) channelFuture ->
                            System.out.println("客户端连接状态:" + channelFuture.isSuccess())).sync();
            channel = connectFuture.channel();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 发送请求
     *
     * @param rpcRequest: 请求对象
     * @return
     */
    public RpcResponse send (final RpcRequest rpcRequest) {
        // 发送请求并等待
        try {
            channel.writeAndFlush(rpcRequest).sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
        // 获取响应结果
        return clientHandler.getRpcResponse(rpcRequest.getRequestId());
    }

    /**
     * 关闭通道
     */
    public void shutdownGracefully() {
        System.out.println("客户端优雅关闭");
        eventLoopGroup.shutdownGracefully();
    }
}

  • 请求与响应

/**
 * 处理器,支持出入站数据处理
 * @see ChannelInboundHandlerAdapter
 * @see io.netty.channel.ChannelOutboundHandler
 */
public class ClientHandler extends ChannelDuplexHandler {

    /**
     * 使用Map维护请求对象ID与响应结果Future的映射关系
     * CompletableFuture: JDK8新增的异步编程工具类
     */
    private Map<String, CompletableFuture> futureMap = new ConcurrentHashMap<>();

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof RpcResponse) {
            RpcResponse rpcResponse = (RpcResponse) msg;
            // 解析结果
            CompletableFuture completableFuture = futureMap.get(rpcResponse.getRequestId());
            // 给到结果
            completableFuture.complete(rpcResponse);
        }
        // 触发下一个ChannelInboundHandler
        super.channelRead(ctx, msg);
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof RpcRequest) {
            RpcRequest rpcRequest = (RpcRequest) msg;
            // 发送前,先将请求对象ID与响应结果Future的映射关系存入Map
            futureMap.put(rpcRequest.getRequestId(), new CompletableFuture());
        }
        super.write(ctx, msg, promise);
    }

    /**
     * 获取响应结果
     *
     * @param requestId: 请求对象ID
     * @return
     */
    public RpcResponse getRpcResponse(String requestId) {
        try {
            // 从Map中获取响应结果Future
            CompletableFuture completableFuture = futureMap.get(requestId);
            // 获取结果,如果没有结果会阻塞
            return (RpcResponse) completableFuture.get(30, TimeUnit.SECONDS);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

}

实现服务器

服务器定义包括通讯、接收请求与响应、实现业务逻辑

  • 基于Netty的通讯

/**
 * RPC服务端
 */
public class ServerNetty {

    /**
     * 绑定端口
     * @param port
     */
    public void bind(int port) {
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup worker = new NioEventLoopGroup();

        // 服务器netty启动器
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boss, worker)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler())
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // in顺序: RpcDecoder -> ServerHandler
                        // out顺序: ServerHandler -> RpcEncoder
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        // 添加编码器
                        pipeline.addLast(new RpcDecoder(RpcRequest.class, new JsonSerializer()));
                        pipeline.addLast(new RpcEncoder(new JsonSerializer()));
                        // 添加自定义处理器
                        pipeline.addLast(new ServerHandler());
                    }
                });
        try {
            // 异步等待启动
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            System.out.println("server start on port: " + port);
            // 等待服务器关闭
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println("server shutdown");
            // 优雅关闭
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

}

  • 接收请求与响应

/**
 * RPC服务处理器
 */
public class ServerHandler extends SimpleChannelInboundHandler<RpcRequest> {

    /**
     * 读取客户端发来的请求数据
     *
     * @param ctx: 通道处理器上下文
     * @param rpcRequest: 客户端发来的请求数据
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequest rpcRequest) throws Exception {
        RpcResponse rpcResponse = new RpcResponse();
        rpcResponse.setRequestId(rpcRequest.getRequestId());

        // 通过反射调用服务方法
        try {
            Object result = reflect(rpcRequest);
            System.out.println("ServerHandler.channelRead0: result = " + result);
            rpcResponse.setResult(result);
        } catch (Exception e) {
            rpcResponse.setError(e.getMessage());
            e.printStackTrace();
        }
        ctx.writeAndFlush(rpcResponse);
    }

    /**
     * 通过反射调用服务方法
     *
     * @param rpcRequest: 客户端发来的请求数据
     * @return
     */
    private Object reflect(RpcRequest rpcRequest) throws Exception {
        // 使用 Class forName
        Class<?> aClass = Class.forName(rpcRequest.getClassName());
        // 查找实现类
        IService serviceImpl = ServiceHelper.getServiceImpl(aClass);

        // 得到 Class getMethod
        Method method = serviceImpl.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
        method.setAccessible(true);

        Object result = method.invoke(serviceImpl, rpcRequest.getParameters());
        return result;
    }
}

  • 实现业务逻辑

/**
 * HelloService 实现类
 */
public class HelloServiceImpl implements HelloService, IService {

    @Override
    public String hello(String name) {
        return "Hello, " + name;
    }

}

/**
 * 获取service的实现类
 */
public class ServiceHelper {

    public static Map<Class, IService> serviceMap = new HashMap<>();
    static {
        serviceMap.put(HelloService.class, new HelloServiceImpl());
    }

    /**
     * 获取service的实现类
     *
     * @param clazz: 类类型
     * @return
     */
    public static IService getServiceImpl(Class clazz) {
        return serviceMap.get(clazz);
    }

}

演示

先启动下面服务器演示类Server;再启动客户端演示类ClientApp;

在ClientApp控制台输入文本进行换行,观察客户端与服务器的控制台效果。

  • 客户端演示类(调用方)

/**
 * Client 演示类
 */
public class ClientApp {

    public static void main(String[] args) {
        // 创建客户端
        ClientNetty client = new ClientNetty();
        client.connect(Constants.HOST, Constants.PORT);
        try {
            HelloService helloService = ClientProxyFactory.create(HelloService.class, client);
            // 生成代理类
            // buildProxy();
            while (true) {
                // 从控制台读取
                Scanner scanner = new Scanner(System.in);
                String nextLine = scanner.nextLine();
                if ("exit".equals(nextLine)) {
                    System.exit(0);
                }
                String hello = helloService.hello(nextLine);
                System.out.println(hello);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            client.shutdownGracefully();
        }
    }

    /**
     * 可以生成代理类class
     */
    public static void buildProxy() {
        byte[] bytes = ProxyGenerator.generateProxyClass("HelloService$proxy", new Class[]{HelloService.class});
        String fileName = System.getProperty("user.dir")+"/netty-rpc-client/target/HelloService$proxy.class";
        try {
            File file = new File(fileName);
            FileOutputStream fileOutputStream = new FileOutputStream(file);
            fileOutputStream.write(bytes);
            fileOutputStream.flush();
            fileOutputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

  • 服务端演示类(被调用方)

/**
 * 服务端演示类
 */
public class Server {

    public static void main(String[] args) {
        ServerNetty serverNetty = new ServerNetty();
        serverNetty.bind(Constants.PORT);
    }

}

总结

至此,完成了一个简易版的基于 Netty 的 RPC 框架,麻雀虽小,五脏俱全。一个 RPC 框架至少包括了协议、代理、通讯、序列化方式等核心组件。

然而,在实际企业微服务架构中,还需要考虑负载均衡、服务注册与发现、服务治理、安全性、性能优化、分布式事务、监控与日志、跨语言支持、持续维护与更新、文档与培训、社区与支持、性能与压力测试等关键考虑因素和组件,以构建一个强大、可靠、高效的分布式系统。