做网站需要的带宽上行还是下行,湖南竞网科技有限公司,html编辑器哪个好用,搭建网站需要什么软件连接池导致内存泄漏案例演示
简介
我们生产环境常常会用Netty客户端作为连接工具#xff0c;尽管Netty强大且方便#xff0c;但是使用不当的话也可能造成严重的生成事故。笔者本文就以一个连接池使用不当导致内存泄漏的案例来展开探讨。
问题复现
服务端代码
我们先贴出…连接池导致内存泄漏案例演示
简介
我们生产环境常常会用Netty客户端作为连接工具尽管Netty强大且方便但是使用不当的话也可能造成严重的生成事故。笔者本文就以一个连接池使用不当导致内存泄漏的案例来展开探讨。
问题复现
服务端代码
我们先贴出服务端代码代码非常简单就是启动然后处理客户端请求。
public class Server {public static void main(String[] args) {NioEventLoopGroup bossGroup new NioEventLoopGroup(1);NioEventLoopGroup workerGroup new NioEventLoopGroup();try {ServerBootstrap bootstrap new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler()).childHandler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline ch.pipeline();pipeline.addLast(new LoggingHandler());}});ChannelFuture channelFuture bootstrap.bind(9999).sync();//channelFuture.channel().closeFuture().sync();channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {System.out.println(链路关闭);bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}});/*TimeUnit.SECONDS.sleep(4);channelFuture.channel().close();*/} catch (Exception e) {e.printStackTrace();} finally {/*bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();*/}}
}服务端代码
服务端代码如下这里笔者用poolSize来模拟客户端并发请求通过传入的poolSize创建poolSize个客户端和服务端建立连接。
public class Client {static void initPool(int poolSize) {for (int i 0; i poolSize; i) {NioEventLoopGroup bossGroup new NioEventLoopGroup();try {Bootstrap bootstrap new Bootstrap();bootstrap.group(bossGroup).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)//.handler(new LoggingHandler()).handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline ch.pipeline();}});ChannelFuture channelFuture bootstrap.connect(127.0.0.1, 9999).sync();channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {System.out.println(链路关闭);//bossGroup.shutdownGracefully();channelFuture.channel().close();}});} catch (Exception e) {e.printStackTrace();}}}}完成上述编码之后我们编写启动代码可以看到笔者这里启动了200个并发请求。
public static void main(String[] args) {try {TimeUnit.SECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}initPool(200);System.out.println(连接池创建成功);}启动并重现问题
完成编码工作之后我们先把服务端启动然后为了更快的重现问题我们在启动客户端之前需对客户端配置如下参数调整堆区
-Xmn32m -Xmx32m启动后不久我们就发现客户端并发请求服务端时抛出内存泄漏的错误
Exception in thread nioEventLoopGroup-23-1 Exception in thread nioEventLoopGroup-28-1 Exception in thread nioEventLoopGroup-28-16 java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: GC overhead limit exceeded
Exception in thread RMI TCP Connection(idle) Exception in thread nioEventLoopGroup-28-8 java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded
Exception in thread main java.lang.OutOfMemoryError: Java heap spaceat io.netty.channel.nio.SelectedSelectionKeySet.init(SelectedSelectionKeySet.java:29)at io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:207)at io.netty.channel.nio.NioEventLoop.init(NioEventLoop.java:149)at io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:127)at io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:36)at io.netty.util.concurrent.MultithreadEventExecutorGroup.init(MultithreadEventExecutorGroup.java:84)at io.netty.util.concurrent.MultithreadEventExecutorGroup.init(MultithreadEventExecutorGroup.java:58)at io.netty.channel.MultithreadEventLoopGroup.init(MultithreadEventLoopGroup.java:52)at io.netty.channel.nio.NioEventLoopGroup.init(NioEventLoopGroup.java:87)at io.netty.channel.nio.NioEventLoopGroup.init(NioEventLoopGroup.java:82)at io.netty.channel.nio.NioEventLoopGroup.init(NioEventLoopGroup.java:63)at io.netty.channel.nio.NioEventLoopGroup.init(NioEventLoopGroup.java:51)at io.netty.channel.nio.NioEventLoopGroup.init(NioEventLoopGroup.java:43)at com.mx.tuning.case2.Client.initPool(Client.java:29)at com.mx.tuning.case2.Client.main(Client.java:70)
Exception in thread nioEventLoopGroup-26-1 java.lang.OutOfMemoryError: GC overhead limit exceeded连接池泄漏原因详解
我们打开jvisualvm查看客户端线程信息可以看到客户端创建无数个独立的NioEventLoopGroup。 查看server却发现都是使用同一个NioEventLoopGroup每个请求都是通过NioEventLoopGroup中的一个线程去处理的。 很明显造成客户端连接池泄漏的原因就是我们错用的线程池我们每个客户端发起请求时用的都是各自的NioEventLoopGroup这不仅使得连接池没有复用更使得nio模型用起来和bio没有区别。 对此我们不妨看看NioEventLoopGroup的源码先从构造方法入手可以看到默认情况下会调用一个 this(0); public NioEventLoopGroup() {this(0);}经过我们的不断步入即可发现默认情况下EventLoopGroup会创建DEFAULT_EVENT_LOOP_THREADS 个线程
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {super(nThreads 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);}我们不妨通过源码看看DEFAULT_EVENT_LOOP_THREADS 的线程数。如下图默认情况下线程池数是CPU核心数的2倍。 以笔者为例笔者的CPU核心数为16那么最终结果则是32。 这一点我们完全可以将Netty源码的结果输出打印出来
System.out.println(thread numMath.max(1, SystemPropertyUtil.getInt(io.netty.eventLoopThreads, NettyRuntime.availableProcessors() * 2)));可以看到结果确实是32 完成线程数初始化的工作之后源码就会为ThreadPerTaskExecutor初始化对应个数的执行器处理后续的各种异步任务。 解决方案
可以想到一共200个请求都会创建32个线程在32m的内存空间是多么可怕的一件事所以我们不妨对客户端做一个调整使得每一个请求都可以复用一个NioEventLoopGroup。
改造后的代码如下所示可以看到笔者将循环建立到连接操作上这样一来就确保的所有的客户端请求都复用一个NioEventLoopGroup。 再次启动我们就发现客户端所有连接都成功了使用监控工具查看线程池也没有问题线程数确实是CPU核心数的2倍。 控制台也输出连接成功了。 补充注意事项
这里我们补充一个注意事项可以看到笔者对每一个请求结束后的关闭方法并不是将bossGroup关闭而是将每个客户端对应的管道即channelFuture.channel()关闭。这样一来我们也确保了一个连接报错之后不会将其他连接对应的NioEventLoopGroup关闭了。 更进一步:客户端连接源码详解
Netty客户端创建原理
图解
了解了客户端连接池的错误使用案例之后我们不妨对客户端创建进行进一步的了解先来看看下面这张经典的时序图可以看到Netty客户端建立连接的方式大抵是以下几个步骤:
创建客户端构建NIO线程组这也就是我们说的创建NioEventLoopGroup反射创建频道创建频道流水线管理要处理的管道上述初始化完成之后通过异步的方式发起TCP连接然后异步处理连接连接操作成功后发起连接操作结果事件调用我们编写的业务代码的handler 源码验证
这里我们不妨对几个比较核心的步骤通过源码的方式进行一下分析创建NioEventLoopGroup这一步我们就不多说了上文已经说明了感兴趣的读者可以自行进一步查看源码。
然后就是反射创建频道这一步在源码中的这个位置可以看到这个代码我们会将管道的类传入 步入源码我们也能发现这块代码会通过一个反射工厂完成频道的创建。 后续频道流水线初始化这里不是重点我们这里简单查看一下即可 服务端流水线操作也是同理通过addLast方法来编排频道的排序。 自此我们将客户端创建的流程整体有了整体的了解下面我们不妨进一步了解一下客户端连接的工作流程。
Netyy客户端注册源码详解
上文我们大概的过了初始化配置的源码接下来我们就来了解一下bootstrap连接服务器的原理。我们首先将服务端代码启动。
然后调整一下客户端启动代码再将其启动 public static void main(String[] args) {initPool(1);System.out.println(连接池创建成功);}然后我们在connect处插入断点并将客户端代码启动 此时不断步入我们的代码走到了Bootstrap的connect方法启动有一个doResolveAndConnect我们不妨步入查看详情。 public ChannelFuture connect(SocketAddress remoteAddress) {......return doResolveAndConnect(remoteAddress, config.localAddress());}然后代码会走到一个initAndRegister方法该方法完成之后会生成一个ChannelFuture 的异步任务任务发起后后续代码会注册一个监听器监听注册结果以及根据注册结果发起真正的远程连接代码我们先不妨看看异步任务生成的逻辑方法initAndRegister为我们做了些什么。 private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {final ChannelFuture regFuture initAndRegister();
....if (regFuture.isDone()) {..........} else {......//上述异步注册任务发起后设置一个监听器一旦上述注册任务完成就会执行监听器中的doResolveAndConnect0方法发起连接regFuture.addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {// Directly obtain the cause and do a null check so we only need one volatile read in case of a// failure.Throwable cause future.cause();if (cause ! null) {......} else {//发起客户端连接服务端doResolveAndConnect0promise.registered();doResolveAndConnect0(channel, remoteAddress, localAddress, promise);}}});return promise;}}可以看到该方法内部为调用一个register方法我们不妨看看这个register做了些什么。 final ChannelFuture initAndRegister() {Channel channel null;.....ChannelFuture regFuture config().group().register(channel);......}我们不断这深入这段代码,可以看到AbstractChannel中的register会通过eventLoop提交一个register0的任务我们不妨看看eventLoop的execute方法做了些什么。 Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {AbstractChannel.this.eventLoop eventLoop;if (eventLoop.inEventLoop()) {.....} else {try {eventLoop.execute(new Runnable() {Overridepublic void run() {register0(promise);}});} catch (Throwable t) {......}}}可以看到execute方法执行了两段核心代码:
addTask,即将我们register0这个任务提交到队列中调用startThread启动NioEventLoop获取并运行队列中的任务。
所以我们不妨看看startThread是如何启动线程去执行我们的register这个任务的。
Overridepublic void execute(Runnable task) {....//添加任务到队列中addTask(task);if (!inEventLoop) {// 如果NioEventLoop没启动则启动让其跑任务startThread();if (isShutdown() removeTask(task)) {reject();}}......}可以看到startThread方法调用了doStartThread去启动线程我们不妨看看它做了些什么。
private void startThread() {if (state ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {try {doStartThread();} catch (Throwable cause) {......}}}}由于代码比较长笔者这里便使用图片的形式来展示一下代码可以看到这段代码中会通过threadPerTaskExecutor提交一个任务该任务便是获取任务中的线程然后调用 SingleThreadEventExecutor.this.run();方法读者如果查看一下this对象即可发现这个this就是我们的NioEventLoop我们不妨看看run方法内部做了些什么。 步入源码后我们会发现该run方法是一个无限的for循环会获取本次通到事件的key 因为我们是初次建立连接所以代码走到了这里内部没有做任务事情所以笔者直接略过这段代码。然后就走到了下图的runAllTasks 最终代码就会从队列中取出我们的register0的任务并执行。 由此走向了一个闭环。 自此我们在这里做一个小结整个任务的流程:
调用connect方法。调用initAndRegister封装并提交一个register0的异步任务。添加上述任务到队列中。启动NioEventLoop线程去处理提交到队列中的任务。NioEventLoop获取通到事件得到上述的register0。执行register0。
Netty注册源码核心异步执行流程
上文中我们代码最终走到了register0我们不妨看看register0做了些什么。经过笔者的整理可以看到该方法大抵做了3个步骤:
调用doRegister流水线注册新的handler即调用invokeHandlerAddedIfNeeded对promise发起连接注册成功后的通知即调用safeSetSuccess方法。
所以我们不妨看看doRegister做了些什么
private void register0(ChannelPromise promise) {try {.......doRegister();
.......// user may already fire events through the pipeline in the ChannelFutureListener.pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();.........}} catch (Throwable t) {......}}可以看到doRegister会调用register最终生成一个通到事件的key赋值给selectionKey ,我们不妨看看生成key之前register方法做了些什么
Overrideprotected void doRegister() throws Exception {boolean selected false;for (;;) {try {selectionKey javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {........}}}然后我们就来到了register方法因为是第一次注册所以k的值为null代码走到register执行事件注册一个连接事件然后返回这个key。
public final SelectionKey register(Selector sel, int ops,Object att)throws ClosedChannelException{synchronized (regLock) {.......SelectionKey k findKey(sel);if (k ! null) {.....}if (k null) {.....synchronized (keyLock) {if (!isOpen())throw new ClosedChannelException();k ((AbstractSelector)sel).register(this, ops, att);addKey(k);}}return k;}}自此我们的doRegister即注册都完成了继续进行执行后续步骤通知客户端可以真正开始连接了。 可以看到代码走到了promise的trySuccess方法我们不妨步入看看这个方法为我们做了些什么
protected final void safeSetSuccess(ChannelPromise promise) {if (!(promise instanceof VoidChannelPromise) !promise.trySuccess()) {logger.warn(Failed to mark a promise as success because it is done already: {}, promise);}}可以看到该段代码会调用一个名为notifyListeners区通知上文注册register监听器。
public boolean trySuccess(V result) {if (setSuccess0(result)) {notifyListeners();return true;}return false;}查看该通知方法我们可以看到调用了notifyListener0方法。 然后代码就走到了DefaultPromise的notifyListener0方法它将会调用operationComplete方法他将会将注册状态设置为true并让客户端执行连接方法我们不妨步入看看
private static void notifyListener0(Future future, GenericFutureListener l) {try {l.operationComplete(future);} catch (Throwable t) {.....}}}可以看到代码最终走到了我们一开始所说的哪个异步任务的后续注册监听的内部逻辑代码中Bootstrap的doResolveAndConnect 它做了以下两件事:
将注册结果设置为true即调用 promise.registered();执行连接逻辑doResolveAndConnect0我们不妨看看内部执行细节。 可以看到其内部核心步骤就是调用doConnect方法。 同理它也是向eventLoopGroup提交一个连接的异步任务,如下图这里的execute仍然会执行我们上述步骤中的addTask然后NioEventLoop轮询通到事件的过程。 之后代码就执行到上图的channel.connect(remoteAddress, connectPromise); 然后这个连接方法就会执行doConnect将OP_CONNECT注册到通到事件中。 Overrideprotected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {if (localAddress ! null) {doBind0(localAddress);}boolean success false;try {boolean connected SocketUtils.connect(javaChannel(), remoteAddress);if (!connected) {selectionKey().interestOps(SelectionKey.OP_CONNECT);}success true;return connected;} finally {if (!success) {doClose();}}}最终NioEventLoop轮询到上述所说的注册的感兴趣的连接事件完成finishConnect工作自此所有连接工作完成。 小结
自此我们的整个客户端连接工作就完成了我们不妨小结整个流程: 参考
Java性能调优 6步实现项目性能升级