公司网站应该怎么做,wordpress文章描述调用修改,海外版tiktok免费入口,淘宝网淘宝网页版RPC#xff08;Remote Procedure Call Protocol#xff09;远程过程调用协议#xff0c;它是一种通过网络#xff0c;从远程计算机程序上请求服务#xff0c;而不必了解底层网络技术的协议。说的再直白一点#xff0c;就是客户端在不必知道调用细节的前提之下#xff0c… RPCRemote Procedure Call Protocol远程过程调用协议它是一种通过网络从远程计算机程序上请求服务而不必了解底层网络技术的协议。说的再直白一点就是客户端在不必知道调用细节的前提之下调用远程计算机上运行的某个对象使用起来就像调用本地的对象一样。目前典型的RPC实现框架有Thriftfacebook开源、Dubboalibaba开源等等。RPC框架针对网络协议、网络I/O模型的封装是透明的对于调用的客户端而言它就认为自己在调用本地的一个对象。至于传输层上运用的是TCP协议、UDP协议、亦或是HTTP协议一概不关心。从网络I/O模型上来看是基于select、poll、epoll方式、还是IOCPI/O Completion Port方式承载实现的对于调用者而言也不用关心。 目前主流的RPC框架都支持跨语言调用即有所谓的IDL接口定义语言其实这个并不是RPC所必须要求的。如果你的RPC框架没有跨语言的要求IDL就可以不用包括了。 最后值得一提的是衡量一个RPC框架性能的好坏与否RPC的网络I/O模型的选择至关重要。在此基础上设计出来的RPC服务器可以考虑支持阻塞式同步IO、非阻塞式同步IO、当然还有所谓的多路复用IO模型、异步IO模型。支持不同的网络IO模型在高并发的状态下处理性能上会有很大的差别。还有一个衡量的标准就是选择的传输协议。是基于TCP协议、还是HTTP协议、还是UDP协议对性能也有一定的影响。但是从我目前了解的情况来看大多数RPC开源实现框架都是基于TCP、或者HTTP的目测没有采用UDP协议做为主要的传输协议的。 明白了RPC的使用原理和性能要求。现在我们能不能撇开那些RPC开源框架自己动手开发一个高性能的RPC服务器呢我想还是可以的。现在本人就使用Java基于Netty开发实现一个高性能的RPC服务器。 如何实现、基于什么原理并发处理性能如何请继续接着看下文。 我们有的时候为了提高单个节点的通信吞吐量提高通信性能。如果是基于Java后端的一般首选的是NIO框架No-block IO。但是问题也来了Java的NIO掌握起来要相当的技术功底和足够的技术积累使用起来才能得心应手。一般的开发人员如果要使用NIO开发一个后端的TCP/HTTP服务器附带考虑TCP粘包、网络通信异常、消息链接处理等等网络通信细节开发门槛太高所以比较明智的选择是采用业界主流的NIO框架进行服务器后端开发。主流的NIO框架主要有Netty、Mina。它们主要都是基于TCP通信非阻塞的IO、灵活的IO线程池而设计的应对高并发请求也是绰绰有余。随着Netty、Mina这样优秀的NIO框架设计上日趋完善Java后端高性能服务器开发在技术上提供了有力的支持保障从而打破了C在服务器后端一统天下的局面。因为在此之前Java的NIO一直受人诟病让人敬而远之 既然这个RPC服务器是基于Netty的那就在说说Netty吧。实际上Netty是对JAVA NIO框架的再次封装它的开源网址是http://netty.io/本文中使用的Netty版本是4.0版本可以通过http://dl.bintray.com/netty/downloads/netty-4.0.37.Final.tar.bz2进行下载使用。那也许你会问如何使用Netty进行RPC服务器的开发呢实际不难下面我就简单的说明一下技术原理 1、定义RPC请求消息、应答消息结构里面要包括RPC的接口定义模块、包括远程调用的类名、方法名称、参数结构、参数值等信息。 2、服务端初始化的时候通过容器加载RPC接口定义和RPC接口实现类对象的映射关系然后等待客户端发起调用请求。 3、客户端发起的RPC消息里面包含远程调用的类名、方法名称、参数结构、参数值等信息通过网络以字节流的方式送给RPC服务端RPC服务端接收到字节流的请求之后去对应的容器里面查找客户端接口映射的具体实现对象。 4、RPC服务端找到实现对象的参数信息通过反射机制创建该对象的实例并返回调用处理结果最后封装成RPC应答消息通知到客户端。 5、客户端通过网络收到字节流形式的RPC应答消息进行拆包、解析之后显示远程调用结果。 上面说的是很简单但是实现的时候我们还要考虑如下的问题 1、RPC服务器的传输层是基于TCP协议的出现粘包咋办这样客户端的请求服务端不是会解析失败好在Netty里面已经提供了解决TCP粘包问题的解码器LengthFieldBasedFrameDecoder可以靠它轻松搞定TCP粘包问题。 2、Netty服务端的线程模型是单线程、多线程一个线程负责客户端连接连接成功之后丢给后端IO的线程池处理、还是主从模式客户端连接、后端IO处理都是基于线程池的实现。当然在这里我出于性能考虑使用了Netty主从线程池模型。 3、Netty的IO处理线程池如果遇到非常耗时的业务出现阻塞了咋办这样不是很容易把后端的NIO线程给挂死、阻塞本文的处理方式是对于复杂的后端业务分派到专门的业务线程池里面进行异步回调处理。 4、RPC消息的传输是通过字节流在NIO的通道Channel之间传输那具体如何实现呢本文是通过基于Java原生对象序列化机制的编码、解码器ObjectEncoder、ObjectDecoder进行实现的。当然出于性能考虑这个可能不是最优的方案。更优的方案是把消息的编码、解码器搞成可以配置实现的。具体比如可以通过protobuf、JBoss Marshalling方式进行解码和编码以提高网络消息的传输效率。 5、RPC服务器要考虑多线程、高并发的使用场景所以线程安全是必须的。此外尽量不要使用synchronized进行加锁改用轻量级的ReentrantLock方式进行代码块的条件加锁。比如本文中的RPC消息处理回调就有这方面的使用。 6、RPC服务端的服务接口对象和服务接口实现对象要能轻易的配置轻松进行加载、卸载。在这里本文是通过Spring容器进行统一的对象管理。 综上所述本文设计的RPC服务器调用的流程图如下所示 客户端并发发起RPC调用请求然后RPC服务端使用Netty连接器分派出N个NIO连接线程这个时候Netty连接器的任务结束。然后NIO连接线程是统一放到Netty NIO处理线程池进行管理这个线程池里面会对具体的RPC请求连接进行消息编码、消息解码、消息处理等等一系列操作。最后进行消息处理Handler的时候处于性能考虑这里的设计是直接把复杂的消息处理过程丢给专门的RPC业务处理线程池集中处理然后Handler对应的NIO线程就立即返回、不会阻塞。这个时候RPC调用结束客户端会异步等待服务端消息的处理结果本文是通过消息回调机制实现MessageCallBack。 再来说一说Netty对于RPC消息的解码、编码、处理对应的模块和流程具体如下图所示 从上图可以看出客户端、服务端对RPC消息编码、解码、处理调用的模块以及调用顺序了。Netty就是把这样一个一个的处理器串在一起形成一个责任链统一进行调用。 说了这么多现在先简单看下我设计实现的NettyRPC的代码目录层级结构 其中newlandframework.netty.rpc.core包是NettyRPC的核心实现。newlandframework.netty.rpc.model包里面则封装了RPC消息请求、应答报文结构以及RPC服务接口与实现绑定关系的容器定义。newlandframework.netty.rpc.config里面定义了NettyRPC的服务端文件配置属性。 下面先来看下newlandframework.netty.rpc.model包中定义的内容。具体是RPC消息请求、应答消息的结构定义 RPC请求消息结构 /*** filename:MessageRequest.java** Newland Co. Ltd. All rights reserved.** Description:rpc服务请求结构* author tangjie* version 1.0**/
package newlandframework.netty.rpc.model;import java.io.Serializable;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;public class MessageRequest implements Serializable {private String messageId;private String className;private String methodName;private Class?[] typeParameters;private Object[] parametersVal;public String getMessageId() {return messageId;}public void setMessageId(String messageId) {this.messageId messageId;}public String getClassName() {return className;}public void setClassName(String className) {this.className className;}public String getMethodName() {return methodName;}public void setMethodName(String methodName) {this.methodName methodName;}public Class?[] getTypeParameters() {return typeParameters;}public void setTypeParameters(Class?[] typeParameters) {this.typeParameters typeParameters;}public Object[] getParameters() {return parametersVal;}public void setParameters(Object[] parametersVal) {this.parametersVal parametersVal;}public String toString() {return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append(messageId, messageId).append(className, className).append(methodName, methodName).toString();}
} RPC应答消息结构 /*** filename:MessageResponse.java** Newland Co. Ltd. All rights reserved.** Description:rpc服务应答结构* author tangjie* version 1.0**/
package newlandframework.netty.rpc.model;import java.io.Serializable;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;public class MessageResponse implements Serializable {private String messageId;private String error;private Object resultDesc;public String getMessageId() {return messageId;}public void setMessageId(String messageId) {this.messageId messageId;}public String getError() {return error;}public void setError(String error) {this.error error;}public Object getResult() {return resultDesc;}public void setResult(Object resultDesc) {this.resultDesc resultDesc;}public String toString() {return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append(messageId, messageId).append(error, error).toString();}
} RPC服务接口定义、服务接口实现绑定关系容器定义提供给spring作为容器使用。 /*** filename:MessageKeyVal.java** Newland Co. Ltd. All rights reserved.** Description:rpc服务映射容器* author tangjie* version 1.0**/
package newlandframework.netty.rpc.model;import java.util.Map;public class MessageKeyVal {private MapString, Object messageKeyVal;public void setMessageKeyVal(MapString, Object messageKeyVal) {this.messageKeyVal messageKeyVal;}public MapString, Object getMessageKeyVal() {return messageKeyVal;}
} 好了定义好核心模型结构之后现在再向大家展示一下NettyRPC核心包newlandframework.netty.rpc.core的关键部分实现代码首先是业务线程池相关类的实现代码具体如下 线程工厂定义实现 /*** filename:NamedThreadFactory.java** Newland Co. Ltd. All rights reserved.** Description:线程工厂* author tangjie* version 1.0**/
package newlandframework.netty.rpc.core;import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;public class NamedThreadFactory implements ThreadFactory {private static final AtomicInteger threadNumber new AtomicInteger(1);private final AtomicInteger mThreadNum new AtomicInteger(1);private final String prefix;private final boolean daemoThread;private final ThreadGroup threadGroup;public NamedThreadFactory() {this(rpcserver-threadpool- threadNumber.getAndIncrement(), false);}public NamedThreadFactory(String prefix) {this(prefix, false);}public NamedThreadFactory(String prefix, boolean daemo) {this.prefix prefix -thread-;daemoThread daemo;SecurityManager s System.getSecurityManager();threadGroup (s null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();}public Thread newThread(Runnable runnable) {String name prefix mThreadNum.getAndIncrement();Thread ret new Thread(threadGroup, runnable, name, 0);ret.setDaemon(daemoThread);return ret;}public ThreadGroup getThreadGroup() {return threadGroup;}
} 业务线程池定义实现 /*** filename:RpcThreadPool.java** Newland Co. Ltd. All rights reserved.** Description:rpc线程池封装* author tangjie* version 1.0**/
package newlandframework.netty.rpc.core;import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class RpcThreadPool {//独立出线程池主要是为了应对复杂耗I/O操作的业务不阻塞netty的handler线程而引入//当然如果业务足够简单把处理逻辑写入netty的handlerChannelInboundHandlerAdapter也未尝不可public static Executor getExecutor(int threads, int queues) {String name RpcThreadPool;return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,queues 0 ? new SynchronousQueueRunnable(): (queues 0 ? new LinkedBlockingQueueRunnable(): new LinkedBlockingQueueRunnable(queues)),new NamedThreadFactory(name, true), new AbortPolicyWithReport(name));}
} /*** filename:AbortPolicyWithReport.java** Newland Co. Ltd. All rights reserved.** Description:线程池异常策略* author tangjie* version 1.0**/
package newlandframework.netty.rpc.core;import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {private final String threadName;public AbortPolicyWithReport(String threadName) {this.threadName threadName;}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {String msg String.format(RpcServer[ Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d), Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)],threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());System.out.println(msg);throw new RejectedExecutionException(msg);}
} RPC调用客户端定义实现 /*** filename:MessageSendExecutor.java** Newland Co. Ltd. All rights reserved.** Description:Rpc客户端执行模块* author tangjie* version 1.0**/
package newlandframework.netty.rpc.core;import java.lang.reflect.Proxy;public class MessageSendExecutor {private RpcServerLoader loader RpcServerLoader.getInstance();public MessageSendExecutor(String serverAddress) {loader.load(serverAddress);}public void stop() {loader.unLoad();}public static T T execute(ClassT rpcInterface) {return (T) Proxy.newProxyInstance(rpcInterface.getClassLoader(),new Class?[]{rpcInterface},new MessageSendProxyT(rpcInterface));}
} 这里的RPC客户端实际上是动态代理了MessageSendProxy当然这里是应用了JDK原生的动态代理实现你还可以改成CGLIBCode Generation Library方式。不过本人测试了一下CGLIB方式在高并发的情况下面会出现空指针异常但是同样的情况JDK原生的动态代理却没有问题。并发程度不高的情况下面两种代理方式都运行正常。后续再深入研究看看吧废话不说了现在给出MessageSendProxy的实现方式 /*** filename:MessageSendProxy.java** Newland Co. Ltd. All rights reserved.** Description:Rpc客户端消息处理* author tangjie* version 1.0**/
package newlandframework.netty.rpc.core;import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.UUID;
import newlandframework.netty.rpc.model.MessageRequest;public class MessageSendProxyT implements InvocationHandler {private ClassT cls;public MessageSendProxy(ClassT cls) {this.cls cls;}public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {MessageRequest request new MessageRequest();request.setMessageId(UUID.randomUUID().toString());request.setClassName(method.getDeclaringClass().getName());request.setMethodName(method.getName());request.setTypeParameters(method.getParameterTypes());request.setParameters(args);MessageSendHandler handler RpcServerLoader.getInstance().getMessageSendHandler();MessageCallBack callBack handler.sendRequest(request);return callBack.start();}
} 进一步发现MessageSendProxy其实是把消息发送给RpcServerLoader模块它的代码如下 /*** filename:RpcServerLoader.java** Newland Co. Ltd. All rights reserved.** Description:rpc服务器配置加载* author tangjie* version 1.0**/
package newlandframework.netty.rpc.core;import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.net.InetSocketAddress;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;public class RpcServerLoader {private volatile static RpcServerLoader rpcServerLoader;private final static String DELIMITER :;private RpcSerializeProtocol serializeProtocol RpcSerializeProtocol.JDKSERIALIZE;//方法返回到Java虚拟机的可用的处理器数量private final static int parallel Runtime.getRuntime().availableProcessors() * 2;//netty nio线程池private EventLoopGroup eventLoopGroup new NioEventLoopGroup(parallel);private static ThreadPoolExecutor threadPoolExecutor (ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1);private MessageSendHandler messageSendHandler null;//等待Netty服务端链路建立通知信号private Lock lock new ReentrantLock();private Condition signal lock.newCondition();private RpcServerLoader() {}//并发双重锁定public static RpcServerLoader getInstance() {if (rpcServerLoader null) {synchronized (RpcServerLoader.class) {if (rpcServerLoader null) {rpcServerLoader new RpcServerLoader();}}}return rpcServerLoader;}public void load(String serverAddress, RpcSerializeProtocol serializeProtocol) {String[] ipAddr serverAddress.split(RpcServerLoader.DELIMITER);if (ipAddr.length 2) {String host ipAddr[0];int port Integer.parseInt(ipAddr[1]);final InetSocketAddress remoteAddr new InetSocketAddress(host, port);threadPoolExecutor.submit(new MessageSendInitializeTask(eventLoopGroup, remoteAddr, this, serializeProtocol));}}public void setMessageSendHandler(MessageSendHandler messageInHandler) {try {lock.lock();this.messageSendHandler messageInHandler;//唤醒所有等待客户端RPC线程signal.signalAll();} finally {lock.unlock();}}public MessageSendHandler getMessageSendHandler() throws InterruptedException {try {lock.lock();//Netty服务端链路没有建立完毕之前先挂起等待if (messageSendHandler null) {signal.await();}return messageSendHandler;} finally {lock.unlock();}}public void unLoad() {messageSendHandler.close();threadPoolExecutor.shutdown();eventLoopGroup.shutdownGracefully();}public void setSerializeProtocol(RpcSerializeProtocol serializeProtocol) {this.serializeProtocol serializeProtocol;}
} 好了现在一次性给出RPC客户端消息编码、解码、处理的模块实现代码。 /*** filename:MessageSendInitializeTask.java** Newland Co. Ltd. All rights reserved.** Description:Rpc客户端线程任务处理* author tangjie* version 1.0**/
package newlandframework.netty.rpc.core;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;public class MessageSendInitializeTask implements Runnable {private EventLoopGroup eventLoopGroup null;private InetSocketAddress serverAddress null;private RpcServerLoader loader null;MessageSendInitializeTask(EventLoopGroup eventLoopGroup, InetSocketAddress serverAddress, RpcServerLoader loader) {this.eventLoopGroup eventLoopGroup;this.serverAddress serverAddress;this.loader loader;}public void run() {Bootstrap b new Bootstrap();b.group(eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true);b.handler(new MessageSendChannelInitializer());ChannelFuture channelFuture b.connect(serverAddress);channelFuture.addListener(new ChannelFutureListener() {public void operationComplete(final ChannelFuture channelFuture) throws Exception {if (channelFuture.isSuccess()) {MessageSendHandler handler channelFuture.channel().pipeline().get(MessageSendHandler.class);MessageSendInitializeTask.this.loader.setMessageSendHandler(handler);}}});}
} /*** filename:MessageSendChannelInitializer.java** Newland Co. Ltd. All rights reserved.** Description:Rpc客户端管道初始化* author tangjie* version 1.0**/
package newlandframework.netty.rpc.core;import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;public class MessageSendChannelInitializer extends ChannelInitializerSocketChannel {//ObjectDecoder 底层默认继承半包解码器LengthFieldBasedFrameDecoder处理粘包问题的时候//消息头开始即为长度字段占据4个字节。这里出于保持兼容的考虑final public static int MESSAGE_LENGTH 4;protected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline socketChannel.pipeline();//ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式保持兼容。因为底层的父类LengthFieldBasedFrameDecoder//的初始化参数即为super(maxObjectSize, 0, 4, 0, 4);pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageSendChannelInitializer.MESSAGE_LENGTH, 0, MessageSendChannelInitializer.MESSAGE_LENGTH));//利用LengthFieldPrepender回填补充ObjectDecoder消息报文头pipeline.addLast(new LengthFieldPrepender(MessageSendChannelInitializer.MESSAGE_LENGTH));pipeline.addLast(new ObjectEncoder());//考虑到并发性能采用weakCachingConcurrentResolver缓存策略。一般情况使用:cacheDisabled即可pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));pipeline.addLast(new MessageSendHandler());}
} /*** filename:MessageSendHandler.java** Newland Co. Ltd. All rights reserved.** Description:Rpc客户端处理模块* author tangjie* version 1.0**/
package newlandframework.netty.rpc.core;import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.net.SocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import newlandframework.netty.rpc.model.MessageRequest;
import newlandframework.netty.rpc.model.MessageResponse;public class MessageSendHandler extends ChannelInboundHandlerAdapter {private ConcurrentHashMapString, MessageCallBack mapCallBack new ConcurrentHashMapString, MessageCallBack();private volatile Channel channel;private SocketAddress remoteAddr;public Channel getChannel() {return channel;}public SocketAddress getRemoteAddr() {return remoteAddr;}public void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);this.remoteAddr this.channel.remoteAddress();}public void channelRegistered(ChannelHandlerContext ctx) throws Exception {super.channelRegistered(ctx);this.channel ctx.channel();}public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {MessageResponse response (MessageResponse) msg;String messageId response.getMessageId();MessageCallBack callBack mapCallBack.get(messageId);if (callBack ! null) {mapCallBack.remove(messageId);callBack.over(response);}}public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}public void close() {channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);}public MessageCallBack sendRequest(MessageRequest request) {MessageCallBack callBack new MessageCallBack(request);mapCallBack.put(request.getMessageId(), callBack);channel.writeAndFlush(request);return callBack;}
} 最后给出RPC服务端的实现。首先是通过spring自动加载RPC服务接口、接口实现容器绑定加载初始化Netty主/从线程池等操作具体是通过MessageRecvExecutor模块实现的现在给出实现代码 /*** filename:MessageRecvExecutor.java** Newland Co. Ltd. All rights reserved.** Description:Rpc服务器执行模块* author tangjie* version 1.0**/
package newlandframework.netty.rpc.core;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.logging.Level;
import newlandframework.netty.rpc.model.MessageKeyVal;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;public class MessageRecvExecutor implements ApplicationContextAware, InitializingBean {private String serverAddress;private final static String DELIMITER :;private MapString, Object handlerMap new ConcurrentHashMapString, Object();private static ThreadPoolExecutor threadPoolExecutor;public MessageRecvExecutor(String serverAddress) {this.serverAddress serverAddress;}public static void submit(Runnable task) {if (threadPoolExecutor null) {synchronized (MessageRecvExecutor.class) {if (threadPoolExecutor null) {threadPoolExecutor (ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1);}}}threadPoolExecutor.submit(task);}public void setApplicationContext(ApplicationContext ctx) throws BeansException {try {MessageKeyVal keyVal (MessageKeyVal) ctx.getBean(Class.forName(newlandframework.netty.rpc.model.MessageKeyVal));MapString, Object rpcServiceObject keyVal.getMessageKeyVal();Set s rpcServiceObject.entrySet();IteratorMap.EntryString, Object it s.iterator();Map.EntryString, Object entry;while (it.hasNext()) {entry it.next();handlerMap.put(entry.getKey(), entry.getValue());}} catch (ClassNotFoundException ex) {java.util.logging.Logger.getLogger(MessageRecvExecutor.class.getName()).log(Level.SEVERE, null, ex);}}public void afterPropertiesSet() throws Exception {//netty的线程池模型设置成主从线程池模式这样可以应对高并发请求//当然netty还支持单线程、多线程网络IO模型可以根据业务需求灵活配置ThreadFactory threadRpcFactory new NamedThreadFactory(NettyRPC ThreadFactory);//方法返回到Java虚拟机的可用的处理器数量int parallel Runtime.getRuntime().availableProcessors() * 2;EventLoopGroup boss new NioEventLoopGroup();EventLoopGroup worker new NioEventLoopGroup(parallel,threadRpcFactory,SelectorProvider.provider());try {ServerBootstrap bootstrap new ServerBootstrap();bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new MessageRecvChannelInitializer(handlerMap)).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);String[] ipAddr serverAddress.split(MessageRecvExecutor.DELIMITER);if (ipAddr.length 2) {String host ipAddr[0];int port Integer.parseInt(ipAddr[1]);ChannelFuture future bootstrap.bind(host, port).sync();System.out.printf([author tangjie] Netty RPC Server start success ip:%s port:%d\n, host, port);future.channel().closeFuture().sync();} else {System.out.printf([author tangjie] Netty RPC Server start fail!\n);}} finally {worker.shutdownGracefully();boss.shutdownGracefully();}}
} 最后还是老规矩给出RPC服务端消息编码、解码、处理的核心模块代码实现具体如下 /*** filename:MessageRecvChannelInitializer.java** Newland Co. Ltd. All rights reserved.** Description:Rpc服务端管道初始化* author tangjie* version 1.0**/
package newlandframework.netty.rpc.core;import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import java.util.Map;public class MessageRecvChannelInitializer extends ChannelInitializerSocketChannel {//ObjectDecoder 底层默认继承半包解码器LengthFieldBasedFrameDecoder处理粘包问题的时候//消息头开始即为长度字段占据4个字节。这里出于保持兼容的考虑final public static int MESSAGE_LENGTH 4;private MapString, Object handlerMap null;MessageRecvChannelInitializer(MapString, Object handlerMap) {this.handlerMap handlerMap;}protected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline socketChannel.pipeline();//ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式保持兼容。因为底层的父类LengthFieldBasedFrameDecoder//的初始化参数即为super(maxObjectSize, 0, 4, 0, 4); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageRecvChannelInitializer.MESSAGE_LENGTH, 0, MessageRecvChannelInitializer.MESSAGE_LENGTH));//利用LengthFieldPrepender回填补充ObjectDecoder消息报文头pipeline.addLast(new LengthFieldPrepender(MessageRecvChannelInitializer.MESSAGE_LENGTH));pipeline.addLast(new ObjectEncoder());//考虑到并发性能采用weakCachingConcurrentResolver缓存策略。一般情况使用:cacheDisabled即可pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));pipeline.addLast(new MessageRecvHandler(handlerMap));}
} /*** filename:MessageRecvHandler.java** Newland Co. Ltd. All rights reserved.** Description:Rpc服务器消息处理* author tangjie* version 1.0**/
package newlandframework.netty.rpc.core;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Map;
import newlandframework.netty.rpc.model.MessageRequest;
import newlandframework.netty.rpc.model.MessageResponse;public class MessageRecvHandler extends ChannelInboundHandlerAdapter {private final MapString, Object handlerMap;public MessageRecvHandler(MapString, Object handlerMap) {this.handlerMap handlerMap;}public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {MessageRequest request (MessageRequest) msg;MessageResponse response new MessageResponse();MessageRecvInitializeTask recvTask new MessageRecvInitializeTask(request, response, handlerMap, ctx);//不要阻塞nio线程复杂的业务逻辑丢给专门的线程池MessageRecvExecutor.submit(recvTask);}public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {//网络有异常要关闭通道ctx.close();}
} /*** filename:MessageRecvInitializeTask.java** Newland Co. Ltd. All rights reserved.** Description:Rpc服务器消息线程任务处理* author tangjie* version 1.0**/
package newlandframework.netty.rpc.core;import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import java.util.Map;
import newlandframework.netty.rpc.model.MessageRequest;
import newlandframework.netty.rpc.model.MessageResponse;
import org.apache.commons.beanutils.MethodUtils;public class MessageRecvInitializeTask implements Runnable {private MessageRequest request null;private MessageResponse response null;private MapString, Object handlerMap null;private ChannelHandlerContext ctx null;public MessageResponse getResponse() {return response;}public MessageRequest getRequest() {return request;}public void setRequest(MessageRequest request) {this.request request;}MessageRecvInitializeTask(MessageRequest request, MessageResponse response, MapString, Object handlerMap, ChannelHandlerContext ctx) {this.request request;this.response response;this.handlerMap handlerMap;this.ctx ctx;}public void run() {response.setMessageId(request.getMessageId());try {Object result reflect(request);response.setResult(result);} catch (Throwable t) {response.setError(t.toString());t.printStackTrace();System.err.printf(RPC Server invoke error!\n);}ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {public void operationComplete(ChannelFuture channelFuture) throws Exception {System.out.println(RPC Server Send message-id respone: request.getMessageId());}});}private Object reflect(MessageRequest request) throws Throwable {String className request.getClassName();Object serviceBean handlerMap.get(className);String methodName request.getMethodName();Object[] parameters request.getParameters();return MethodUtils.invokeMethod(serviceBean, methodName, parameters);}
} 然后是RPC消息处理的回调实现模块代码 /*** filename:MessageCallBack.java** Newland Co. Ltd. All rights reserved.** Description:Rpc消息回调* author tangjie* version 1.0**/
package newlandframework.netty.rpc.core;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import newlandframework.netty.rpc.model.MessageRequest;
import newlandframework.netty.rpc.model.MessageResponse;public class MessageCallBack {private MessageRequest request;private MessageResponse response;private Lock lock new ReentrantLock();private Condition finish lock.newCondition();public MessageCallBack(MessageRequest request) {this.request request;}public Object start() throws InterruptedException {try {lock.lock();//设定一下超时时间rpc服务器太久没有相应的话就默认返回空吧。finish.await(10*1000, TimeUnit.MILLISECONDS);if (this.response ! null) {return this.response.getResult();} else {return null;}} finally {lock.unlock();}}public void over(MessageResponse reponse) {try {lock.lock();finish.signal();this.response reponse;} finally {lock.unlock();}}
} 到此为止NettyRPC的关键部分服务端、客户端的模块已经通过Netty全部实现了。现在给出spring加载配置rpc-invoke-config.xml的内容 ?xml version1.0 encodingUTF-8?
beans xmlnshttp://www.springframework.org/schema/beansxmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexmlns:contexthttp://www.springframework.org/schema/contextxsi:schemaLocationhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context.xsdcontext:component-scan base-packagenewlandframework.netty.rpc.core/context:property-placeholder locationclasspath:newlandframework/netty/rpc/config/rpc-server.properties/bean idrpcbean classnewlandframework.netty.rpc.model.MessageKeyValproperty namemessageKeyValmapentry keynewlandframework.netty.rpc.servicebean.Calculateref beancalc//entry/map/property/beanbean idcalc classnewlandframework.netty.rpc.servicebean.CalculateImpl/bean idrpcServer classnewlandframework.netty.rpc.core.MessageRecvExecutorconstructor-arg nameserverAddress value${rpc.server.addr}//bean
/beans 再贴出RPC服务绑定ip信息的配置文件rpc-server.properties的内容。 #rpc servers ip address config
rpc.server.addr127.0.0.1:18888 最后NettyRPC服务端启动方式参考如下 new ClassPathXmlApplicationContext(newlandframework/netty/rpc/config/rpc-invoke-config.xml); 如果一切顺利没有出现意外的话控制台上面会出现如下截图所示的情况 如果出现了说明NettyRPC服务器已经启动成功 上面基于Netty的RPC服务器并发处理性能如何呢实践是检验真理的唯一标准下面我们就来实战一下。 下面的测试案例是基于RPC远程调用两数相加函数并返回计算结果。客户端同时开1W个线程同一时刻瞬时发起并发计算请求然后观察Netty的RPC服务器是否有正常应答回复响应以及客户端是否有正常返回调用计算结果。值得注意的是测试案例是基于1W个线程瞬时并发请求而设计的并不是1W个线程循环发起请求。这两者对于衡量RPC服务器的并发处理性能还是有很大差别的。当然前者对于并发性能的处理要求要高上很多很多。 现在先给出RPC计算接口、RPC计算接口实现类的代码实现 /*** filename:Calculate.java** Newland Co. Ltd. All rights reserved.** Description:计算器定义接口* author tangjie* version 1.0**/
package newlandframework.netty.rpc.servicebean;public interface Calculate {//两数相加int add(int a, int b);
} /*** filename:CalculateImpl.java** Newland Co. Ltd. All rights reserved.** Description:计算器定义接口实现* author tangjie* version 1.0**/
package newlandframework.netty.rpc.servicebean;public class CalculateImpl implements Calculate {//两数相加public int add(int a, int b) {return a b;}
} 下面是瞬时并发RPC请求的测试样例 /*** filename:CalcParallelRequestThread.java** Newland Co. Ltd. All rights reserved.** Description:并发线程模拟* author tangjie* version 1.0**/
package newlandframework.netty.rpc.servicebean;import newlandframework.netty.rpc.core.MessageSendExecutor;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Level;
import java.util.logging.Logger;public class CalcParallelRequestThread implements Runnable {private CountDownLatch signal;private CountDownLatch finish;private MessageSendExecutor executor;private int taskNumber 0;public CalcParallelRequestThread(MessageSendExecutor executor, CountDownLatch signal, CountDownLatch finish, int taskNumber) {this.signal signal;this.finish finish;this.taskNumber taskNumber;this.executor executor;}public void run() {try {signal.await();Calculate calc executor.execute(Calculate.class);int add calc.add(taskNumber, taskNumber);System.out.println(calc add result:[ add ]);finish.countDown();} catch (InterruptedException ex) {Logger.getLogger(CalcParallelRequestThread.class.getName()).log(Level.SEVERE, null, ex);}}
} /*** filename:RpcParallelTest.java** Newland Co. Ltd. All rights reserved.** Description:rpc并发测试代码* author tangjie* version 1.0**/
package newlandframework.netty.rpc.servicebean;import java.util.concurrent.CountDownLatch;
import newlandframework.netty.rpc.core.MessageSendExecutor;
import org.apache.commons.lang.time.StopWatch;public class RpcParallelTest {public static void main(String[] args) throws Exception {final MessageSendExecutor executor new MessageSendExecutor(127.0.0.1:18888);//并行度10000int parallel 10000;//开始计时StopWatch sw new StopWatch();sw.start();CountDownLatch signal new CountDownLatch(1);CountDownLatch finish new CountDownLatch(parallel);for (int index 0; index parallel; index) {CalcParallelRequestThread client new CalcParallelRequestThread(executor, signal, finish, index);new Thread(client).start();}//10000个并发线程瞬间发起请求操作signal.countDown();finish.await();sw.stop();String tip String.format(RPC调用总共耗时: [%s] 毫秒, sw.getTime());System.out.println(tip);executor.stop();}
} 好了现在先启动NettyRPC服务器确认没有问题之后运行并发RPC请求客户端看下客户端打印的计算结果以及处理耗时。 从上面来看10000个瞬时RPC计算请求总共耗时接近11秒。我们在来看下NettyRPC的服务端运行情况如下所示 可以很清楚地看到RPC服务端都有收到客户端发起的RPC计算请求并返回消息应答。 最后我们还是要分别验证一下RPC服务端是否存在丢包、粘包、IO阻塞的情况1W个并发计算请求是否成功接收处理并应答了实际情况说明一切看下图所示 非常给力RPC的服务端确实成功接收到了客户端发起的1W笔瞬时并发计算请求并且成功应答处理了。并没有出现丢包、粘包、IO阻塞的情况。再看下RPC客户端是否成功得到计算结果的应答返回了呢 很好RPC的客户端确实收到了RPC服务端计算的1W笔加法请求的计算结果而且耗时接近11秒。由此可见基于Netty业务线程池的NettyRPC服务器应对并发多线程RPC请求处理起来是得心应手游刃有余 最后本文通过Netty这个NIO框架实现了一个很简单的“高性能”的RPC服务器代码虽然写出来了但是还是有一些值得改进的地方比如 1、对象序列化传输可以支持目前主流的序列化框架protobuf、JBoss Marshalling、Avro等等。 2、Netty的线程模型可以根据业务需求进行定制。因为并不是每笔业务都需要这么强大的并发处理性能。 3、目前RPC计算只支持一个RPC服务接口映射绑定一个对应的实现后续要支持一对多的情况。 4、业务线程池的启动参数、线程池并发阻塞容器模型等等可以配置化管理。 5、Netty的Handler处理部分对于复杂的业务逻辑现在是统一分派到特定的线程池进行后台异步处理。当然你还可以考虑JMS消息队列方式进行解耦统一分派给消息队列的订阅者统一处理。目前实现JMS的开源框架也有很多ActiveMQ、RocketMQ等等都可以考虑。 本文实现的NettyRPC对于面前的您而言一定还有很多地方可以加以完善和改进优化改进的工作就交给您自由发挥了。 由于本人技术能力、认知水平有限。本文中有说不对的地方恳请园友们批评指正不吝赐教最后感谢面前的您耐心的阅读完本文相信现在的你对于Java开发高性能的服务端应用又有了一个更深入的了解本文算是对我Netty学习成果的阶段性总结后续有时间我还会继续推出Netty工业级开发的相关文章敬请期待 PS还有兴趣的朋友可以参考、阅读一下我的另外一篇文章Netty实现高性能RPC服务器优化篇之消息序列化。此外自从在博客园发表了两篇基于Netty开发高性能RPC服务器的文章之后本人收到很多园友们索要源代码进行学习交流的请求。为了方便大家本人把NettyRPC的代码开源托管到github上面欢迎有兴趣的朋友一起学习、研究 附上NettyRPC项目的下载路径https://github.com/tang-jie/NettyRPC Netty工业级开发系列文章进阶Netty构建分布式消息队列AvatarMQ设计指南之架构篇 谈谈如何使用Netty开发实现高性能的RPC服务器、Netty实现高性能RPC服务器优化篇之消息序列化。这两篇文章主要设计的思路是基于Netty构建了一个高性能的RPC服务器而这些前期代码的准备工作主要是为了设计、实现一个基于Netty的分布式消息队列系统做铺垫本人把这个分布式消息队列系统命名为AvatarMQ。作为Netty工业级开发系列的进阶篇感兴趣的朋友可以点击关注Netty构建分布式消息队列AvatarMQ设计指南之架构篇一定不会让您失望 AvatarMQ项目开源网址https://github.com/tang-jie/AvatarMQ。