青海建设厅网站首页,wordpress 网上支付,全国疫苗接种率,免费简历制作网站推荐文章目录 基于 Netty 的简易版 RPC需求分析简易RPC框架的整体实现协议模块 #x1f4d6;自定义协议 #x1f195;序列化方式 #x1f522; 服务工厂 #x1f3ed;服务调用方 ❓前置知识——动态代理#x1f573;️Proxy类InvocationHandler 接口 RPC服务代理类内嵌Netty客… 文章目录 基于 Netty 的简易版 RPC需求分析简易RPC框架的整体实现协议模块 自定义协议 序列化方式 服务工厂 服务调用方 ❓前置知识——动态代理️Proxy类InvocationHandler 接口 RPC服务代理类内嵌Netty客户端核心handlerRpcResponseMessageHandler 服务提供方核心handlerRpcRequestMessageHandler 撸了这么多验收一下吧 在之前的博文中我们学习了Netty的基础知识了解了其原理和组件。在本篇博文中我们将结合实际案例分享一个基于Netty的简化版RPC远程过程调用实现。通过这个案例我们不仅可以学习Netty的使用和原理还能够对RPC的设计有一个整体的学习。 项目托管与gitee
gw-rpc 基于 Netty 的简易版 RPC需求分析
随着分布式和微服务的盛行给我们的项目带来的收益是不同模块间的解耦从而使整个软件开发流程更加的灵活。同时模块间的调用是稀松平常的事情。这就会出现一系列的新的需求
不同的模块有可能是分布在不同的机器上要想相互调用一定会涉及到网络传输所以要有相应的通信模块。其次网络传输的数据是二进制流。而在面向对象的程序中业务处理的是对象这就需要发送方在网络发送之前把对象序列化成二进制流同时网络接收方收到二进制流后需要把二进制流反序列化成对象。同时为了让调用方调用远程服务像调用本地方法一样简单需要对网络请求、序列化做封装Java 中一般采用动态代理去实现。还有要有注册中心提供服务方的地址列表同时出现了新的服务节点需要注册中心及时发现这样调用方才能找到合适的服务方。最后还需要负载均衡、熔断、降级、心跳探活等功能。
本篇博文中只讲解一个简化版的 RPC 设计注册中心和负载均衡及心跳探活等功能就不讲了…… 捋了捋大致流程如下 首先客户端通过动态代理模块获取代理实例。接下来客户端通过动态代理模块 来调用动态代理方法用来实现封装 RpcRequestMessage 对象把要调用的服务和方法以及方法参数准备通过网络请求发送出去。在发送之前通过编码模块转换对象序列化为字节数组。动态代理随后会通过网络通信把序列化成字节数组的请求发送给服务端同时客户端同步或异步等待服务端的响应。这些工作都由动态代理完成对于调用方来说是无感的。服务端收到客户端的请求后把字节数组反序列化成业务对象。服务端根据请求中要调用的类和方法通过反射实例化类并找到对应的方法。服务端用收到的参数调用本地方法后封装响应对象。把响应对象序列化为字符数组。服务端把序列化的响应对象通过网络返回给客户端。客户端收到序列化成字节数组的响应后反序列化成响应对象。
简易RPC框架的整体实现
我们的 RPC 项目主要分为下面几个模块结构非常清晰 配套的源码源代码地址gw-rpc 。 主要分为以下几个基本模块。
协议模块设计了通信请求体、响应体序列化模式。
服务工厂通过map维护接口类和实现类的映射关系。
服务调用方模块实现了服务调用方的基本功能同时包含了动态代理的功能实现。
服务提供方模块实现了服务提供方的基本功能。 协议模块
自定义协议
关于协议模块拆分至另外一篇博文进行讲解使用Netty进行协议开发多协议支持与自定义协议的实现。
序列化方式
对于RPC来说序列化是一个必不可少的过程它将业务对象以字节数组的形式在网络中进行传输。为了实现序列化功能通常定义一个序列化接口其中包含序列化方法和反序列化方法。在实际应用中提供几种常见的序列化方式可供选择包括以下几种
Java JDK 自带的序列化Java 提供了默认的序列化机制通过ObjectInputStream和ObjectOutputStream实现。对象可以以二进制形式进行序列化和反序列化。这种方式简单易用但存在一些性能和版本兼容性的问题。Json算法使用Gson库将对象转换为JSON字符串并通过JSON字符串进行序列化和反序列化操作。Hessian算法使用Hessian库将对象序列化为字节数组提供了更高的性能和较小的序列化结果。 /*** Description: 序列化** author LinHuiBa-YanAn* date 2023/8/7 20:32*/public interface Serializer {/*** 反序列化方法** param clazz 类型* param bytes 字节码* param T 类型* return 对象*/T T deserialize(ClassT clazz, byte[] bytes);/*** 序列化方法** param object 对象* param T 类型* return byte[]*/T byte[] serialize(T object);enum Algorithm implements Serializer {Java {Overridepublic T T deserialize(ClassT clazz, byte[] bytes) {try {ObjectInputStream ois new ObjectInputStream(new ByteArrayInputStream(bytes));return (T) ois.readObject();} catch (IOException | ClassNotFoundException e) {throw new RuntimeException(反序列化失败, e);}}Overridepublic T byte[] serialize(T object) {try {ByteArrayOutputStream bos new ByteArrayOutputStream();ObjectOutputStream oos new ObjectOutputStream(bos);oos.writeObject(object);return bos.toByteArray();} catch (IOException e) {throw new RuntimeException(序列化失败, e);}}},Json {Overridepublic T T deserialize(ClassT clazz, byte[] bytes) {Gson gson new GsonBuilder().registerTypeAdapter(Class.class, new ClassCodec()).create();String json new String(bytes, StandardCharsets.UTF_8);return gson.fromJson(json, clazz);}Overridepublic T byte[] serialize(T object) {Gson gson new GsonBuilder().registerTypeAdapter(Class.class, new ClassCodec()).create();String json gson.toJson(object);return json.getBytes(StandardCharsets.UTF_8);}},Hessian {Overridepublic T T deserialize(ClassT clazz, byte[] bytes) {ByteArrayInputStream byteArrayInputStream new ByteArrayInputStream(bytes);HessianInput hessianInput new HessianInput((byteArrayInputStream));// 反序列化成对象Object object null;try {object hessianInput.readObject(clazz);} catch (IOException e) {e.printStackTrace();} finally {hessianInput.close();}return (T) object;}Overridepublic T byte[] serialize(T object) {ByteArrayOutputStream byteArrayOutputStream new ByteArrayOutputStream();byte[] bytes new byte[0];try {HessianOutput ho new HessianOutput(byteArrayOutputStream);ho.writeObject(object);bytes byteArrayOutputStream.toByteArray();} catch (IOException e) {e.printStackTrace();} finally {try {byteArrayOutputStream.close();} catch (IOException e) {e.printStackTrace();}return bytes;}}}}/*** 适配器*/class ClassCodec implements JsonSerializerClass?, JsonDeserializerClass? {Overridepublic Class? deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {try {String str json.getAsString();return Class.forName(str);} catch (ClassNotFoundException e) {throw new JsonParseException(e);}}Overridepublic JsonElement serialize(Class? src, Type typeOfSrc, JsonSerializationContext context) {// class - jsonreturn new JsonPrimitive(src.getName());}}}对象的序列化和反序列化的需求在于当我们收到数据的时候需要把二进制的 byte 数组转换为业务对象这里就需要在 Netty 的 pipeline 中添加 inbound Handler而对于发送数据则需要把业务对象转换为二进制的 byte 数据也就是需要在 Netty 的 pipeline 中添加 outbound Handler。
服务工厂
ServicesFactory是一个用于创建服务类实例的Java类。它根据在application.properties文件中定义的配置属性将接口类与实现类进行映射并提供了一个getService方法用于获取接口类对应的实例。在类加载时它读取application.properties文件并将属性加载到Properties对象中。然后它遍历属性名称检查是否以Service结尾并获取相应的接口类和实现类。通过使用反射创建实现类的实例并将接口类和实例对象存储在ConcurrentHashMap中。通过调用getService方法并传入接口类可以获取对应的实现类实例。该类的设计允许根据配置文件动态创建服务类实例提供了一种灵活的方式来管理和获取服务实例。
/*** Description: 服务工厂** author YanAn* date 2023/8/7 20:54*/
public class ServicesFactory {static Properties properties;static MapClass?, Object map new ConcurrentHashMap();static {try (InputStream in Config.class.getResourceAsStream(/application.properties)) {properties new Properties();properties.load(in);SetString names properties.stringPropertyNames();for (String name : names) {if (name.endsWith(Service)) {Class? interfaceClass Class.forName(name);Class? instanceClass Class.forName(properties.getProperty(name));map.put(interfaceClass, instanceClass.newInstance());}}} catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {throw new ExceptionInInitializerError(e);}}public static T T getService(ClassT interfaceClass) {return (T) map.get(interfaceClass);}
}配置文件举例
com.gw.core.service.HelloServicecom.gw.core.service.impl.HelloServiceImpl服务调用方 ❓
前置知识——动态代理️
在前几年代购家喻户晓。何为代购简单来说就是找人帮忙购买所需要的商品当然可能需要向实施代购的人支付一定的费用。在软件开发中也有一种设计模式可以提供与代购类似的功能由于某些原因客户端不想或不能直接访问一个对象此时可以通过一个称为“代理”的第三者来实现间接访问该访问对应的设计模式被称为代理模式。即 给某一个对象提供一个代理或占位符并由代理对象来控制对原对象的访问。
那什么是动态代理呢动态代理Dynamic Proxy。可以让系统在运行时根据实际需要来动态创建代理类让同一个代理类能够代理多个不同的真实主题类而且可以代理不同的方法。动态代理是一种较为高级的代理模式它在事务管理、AOP等领域都发挥了重要的作用。 从jdk1.3开始java就提供了对动态代理的支持。下面简要说明一下 Proxy类
Proxy类提供了用于创建动态代理类和实例对象的方法它是所创建的动态代理类的父类。我们直接去看它的核心方法
getProxyClass方法 用于返回一个 Class 类型的代理类在参数中需要提供类加载器并需要指定代理的接口数组。
public static Class? getProxyClass(ClassLoader loader,Class?... interfaces)loader类加载器interfaces代理的接口数组
newProxyInstance方法 用于返回一个动态创建的代理类的实例。
public static Object newProxyInstance(ClassLoader loader,Class?[] interfaces,InvocationHandler h)loader类加载器interfaces代理类所实现的接口列表h所指派的调用处理程序类我们可以在这个类中添加公共逻辑比如网络逻辑
InvocationHandler 接口
InvocationHandler接口是代理处理程序类的实现接口该接口作为代理实例的调用处理者的公共父类每一个代理类的实例都可以提供一个相关的具体调用出阿里着InvocationHandler 接口的实现类。在实现该接口的同时必须得实现InvocationHandler接口中声明的invoke方法
public Object invoke(Object proxy, Method method, Object[] args)throws Throwable;该方法用于处理对代理实例的方法调用并返回相应的结果当一个代理实例中的业务方法被调用时将自动调用该方法。
proxy代理类的实例method需要代理的方法args代理方法的参数数组
动态代理类需要在运行时指定所代理真实主题类的接口客户端在调用动态代理对象的方法时调用请求会将请求自动转发给 InvocationHandler 对象的 invoke() 方法由 invoke() 方法来实现对请求的统一处理。
RPC服务代理类
我们在Proxy的基础上封装了一个代理模块。在 invoke() 方法中我们将需要调用的接口方法和其他相关信息封装成一个业务对象并使用 RpcClient.getChannel() 方法获取通道。然后我们将封装好的消息 msg 写入并刷新通道发送给远程服务器。
同时我们创建了一个与通道关联的 DefaultPromise 对象用于处理异步操作的结果。我们将生成的序列号和 promise 对象存放在 RpcResponseMessageHandler.PROMISES 集合中后面会讲解该核心handler以便在接收到响应时进行对应处理。
接下来我们等待 promise 对象的完成。一旦 promise 对象完成我们根据其结果进行判断。如果操作成功我们返回相应的结果如果操作失败我们抛出一个异常来表示错误情况。
Slf4j
public class RpcServiceProxy {/*** 获取代理实例** param serviceClass 服务类.class* param T 服务类.class* return 执行结果*/public static T T getProxyService(ClassT serviceClass) {ClassLoader loader serviceClass.getClassLoader();Class?[] interfaces new Class[]{serviceClass};Object obj Proxy.newProxyInstance(loader, interfaces, new RpcServiceProxyInvocationHandler(serviceClass));return (T) obj;}/*** The class that actually implements the proxy logic*/static class RpcServiceProxyInvocationHandler implements InvocationHandler {private final Class referenceConfig;public RpcServiceProxyInvocationHandler(Class referenceConfig) {this.referenceConfig referenceConfig;}Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {int sequenceId SequenceIdGenerator.nextId();RpcRequestMessage msg new RpcRequestMessage(sequenceId,referenceConfig.getName(),method.getName(),method.getReturnType(),method.getParameterTypes(),args);RpcClient.getChannel().writeAndFlush(msg);DefaultPromiseObject promise new DefaultPromise(RpcClient.getChannel().eventLoop());RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);promise.await();if (promise.isSuccess()) {return promise.getNow();} else {throw new RuntimeException(promise.cause());}}}
}内嵌Netty客户端
在RpcServiceProxy类中内嵌Netty客户端类用于与服务提供方建立连接并进行通信。
/*** 内嵌Netty客户端*/
static class RpcClient {/*** channel*/private static Channel channel null;/*** lock*/private static final Object LOCK new Object();/*** get channel** return Channel*/public static Channel getChannel() {if (channel ! null) {return channel;}synchronized (LOCK) {if (channel ! null) {return channel;}initChannel();return channel;}}/*** init channel*/private static void initChannel() {NioEventLoopGroup group new NioEventLoopGroup();LoggingHandler loggingHandler new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable messageCodec new MessageCodecSharable();RpcResponseMessageHandler rpcHandler new RpcResponseMessageHandler();Bootstrap bootstrap new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new ProtocolFrameDecoder());ch.pipeline().addLast(loggingHandler);ch.pipeline().addLast(messageCodec);ch.pipeline().addLast(rpcHandler);}});try {channel bootstrap.connect(Config.getServerIp(), Config.getProjectPort()).sync().channel();channel.closeFuture().addListener(future - group.shutdownGracefully());} catch (Exception e) {log.error(client error, e);}}
}主要有四个Handler分别是
ProtocolFrameDecoder协议帧解码器LoggingHandler日志处理MessageCodecSharable消息的解编码器RpcResponseMessageHandlerRpc响应消息处理程序
核心handlerRpcResponseMessageHandler
核心方法 channelRead0 在代理模块中负责处理服务器的响应。在方法中首先从从维护的 PROMISES 集合中删除与当前响应相关的映射关系。随后响应消息中获取结果并根据结果设置相应的 promise 对象以完成异步操作。
当 invoke() 方法中的 promise 对象等待结果时通过 channelRead0 方法的处理promise 对象将结束阻塞并获取到封装的执行结果。这样便完成了对远程方法调用的响应处理和结果返回过程。
/*** Description: Rpc响应消息处理程序** author LinHuiBa-YanAn* date 2023/8/8 10:29*/
Slf4j
ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandlerRpcResponseMessage {/*** The promise object used to receive the result*/public static final MapInteger, PromiseObject PROMISES new ConcurrentHashMap();Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {log.info(Netty rpc client receives the response:{}, msg);PromiseObject promise PROMISES.remove(msg.getSequenceId());if (promise ! null) {Object returnValue msg.getReturnValue();Exception exceptionValue msg.getExceptionValue();if (exceptionValue null) {promise.setSuccess(returnValue);} else {promise.setFailure(exceptionValue);}}}
}服务提供方
相对于服务端调用方模块而言服务提供方模块相对简单。通过前面几篇博文对 Netty 的学习我们已经具备了足够的知识来处理服务提供方的实现这里简直是小菜一碟啦
/*** Description: RPC服务端** author LinHuiBa-YanAn* date 2023/8/7 20:45*/
Slf4j
public class RpcServer {public static void main(String[] args) {log.info(netty rpc server starting......);NioEventLoopGroup boss new NioEventLoopGroup();NioEventLoopGroup worker new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC new MessageCodecSharable();RpcRequestMessageHandler RPC_HANDLER new RpcRequestMessageHandler();try {ServerBootstrap serverBootstrap new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProtocolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});Channel channel serverBootstrap.bind(Config.getProjectPort()).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error(server error, e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}服务端demo已经写的手烂了还是老样子介绍一下pipeline上的handler
ProtocolFrameDecoder协议帧解码器LoggingHandler日志处理MessageCodecSharable消息的解编码器RpcRequestMessageHandlerRpc请求消息处理程序
核心handlerRpcRequestMessageHandler
/*** Description: Rpc请求消息处理程序** author LinHuiBa-YanAn* date 2023/8/7 20:52*/
Slf4j
ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandlerRpcRequestMessage {Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage rpcRequest) {RpcResponseMessage rpcResponse new RpcResponseMessage();log.info(Netty rpc server receives the request:{}, rpcRequest);rpcResponse.setSequenceId(rpcRequest.getSequenceId());rpcResponse.setMessageType(rpcRequest.getMessageType());try {Object service ServicesFactory.getService(Class.forName(rpcRequest.getInterfaceName()));Method method service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());Object invoke method.invoke(service, rpcRequest.getParameterValue());rpcResponse.setReturnValue(invoke);} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {log.error(RPC processing failed. An exception occurred. Procedure. exception:{}, e.getMessage());rpcResponse.setExceptionValue(e);}ctx.writeAndFlush(rpcResponse);}
}核心方法 channelRead0 在代理模块中负责处理服务器的请求。在该方法中首先从请求消息体中获取需要调用的接口方法以及其他相关信息。这些信息通常包括接口名称、方法名称、参数类型和参数值等。
接下来通过服务工厂获取到需要调用接口的对象实例。服务工厂负责管理和创建服务实例以便在接收到请求时能够正确地调用相应的方法。
接着通过反射的方式执行需要调用的方法。根据接口名称、方法名称以及参数类型和参数值使用反射机制调用相应的方法并获取执行结果。
最后将执行的结果封装成响应消息并写入通道以便返回给服务调用方。响应消息通常包括执行结果、状态码和其他相关信息用于服务调用方处理和解析。
撸了这么多验收一下吧
首先启动服务提供方 以com.gw.core.service.HelloService#sayHello 方法为例 非常感谢您的阅读项目中还有其他小设计鼓励您深入研究和体验这些设计以便更好地理解和掌握项目的细节。 如果您在项目中遇到任何问题或需要进一步的帮助随时向我提问。我将尽力为您提供支持和解答。祝您在项目中取得成功并愉快地品尝这些小设计 项目托管与giteegw-rpc