网站开发工程师求职信,用织梦网站后台发布文章为什么还需要审核,网站建设情况说明,中华商标交易网官方网站前言 Dubbo使用Netty作为网络调用框架#xff0c;Netty是一个Reactor模型的框架#xff0c;线程模型分为boss线程池和worker线程池#xff0c;boss线程池负责监听、分配事件#xff0c;worker线程池负责处理事件#xff0c;简单说就是boss线程池负责hold请求#xff0c;并…
前言 Dubbo使用Netty作为网络调用框架Netty是一个Reactor模型的框架线程模型分为boss线程池和worker线程池boss线程池负责监听、分配事件worker线程池负责处理事件简单说就是boss线程池负责hold请求并分发到worker池worker线程池负责处理具体事件。 dubbo在原本的netty中的线程boss线程和worker做了一些修改将其定义为io线程而后由实现了一套用于处理业务的业务线程池这就和上一篇介绍的Dubbo协议下的服务端线程模型产生了关联dubbo的io线程监听请求业务处理由dubbo自定义的线程池处理这里将请求分发到具体的业务线程池就是由Dispatcher实现的默认是AllDispatcher上一篇已经简单介绍了Dubbo协议的线程池的分发模型这篇文章就介绍下Dubbo究竟自定义了哪几种线程池的实现并且都是怎么实现的。
注Apache Dubbo版本为3.0.7
Dubbo线程池接口ThreadPool Dubbo自定义的线程池的核心接口是org.apache.dubbo.common.threadpool.ThreadPool并且提供了四种实现分别是CachedThreadPool、FixedThreadPool、LimitedThreadPool、EagerThreadPoolThreadPool接口是SPI的如果不指定线程池的具体实现默认是fixed在项目中配置如下配置线程池类型是fixed线程数为100线程模型是all xml
复制代码
dubbo:protocol namedubbo dispatcherall threadpoolfixed threads100 /
ThreadPool代码如下接下来分别简单介绍一下四种线程池的具体实现 java
复制代码
SPI(value fixed, scope ExtensionScope.FRAMEWORK)
public interface ThreadPool { /** * Thread pool * * param url URL contains thread parameter* return thread pool */ Adaptive({THREADPOOL_KEY}) Executor getExecutor(URL url);
}
CachedThreadPool缓存线程池 该线程池是缓存类型的当空闲到一定时间时会将线程删掉使用时再创建具体dubbo的实现如下代码实现很简单就是使用JUC的ThreadPoolExecutor创建了一个缓存类型的线程池将maximumPoolSize设置成Integer.MAX_VALUEkeepAliveTime设置成60000毫秒队列大小设置成0当超过任务数超过corePoolSize就会直接创建worker线程当线程空闲60s后就会被销毁。
public class CachedThreadPool implements ThreadPool {Overridepublic Executor getExecutor(URL url) {String name url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));int cores url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);int threads url.getParameter(THREADS_KEY, Integer.MAX_VALUE);int queues url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);int alive url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,queues 0 ? new SynchronousQueueRunnable() :(queues 0 ? new LinkedBlockingQueueRunnable(): new LinkedBlockingQueueRunnable(queues)),new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));}
FixedThreadPool固定线程数的线程池 该线程池是固定线程数的线程池实现具体实现也是使用JUC的ThreadPoolExecutor创建了一个固定线程数的线程池通过url中配置的threads将corePoolSize和maximumPoolSize都设置成threads的数量并且keepAliveTime设置成0。
public class FixedThreadPool implements ThreadPool {Overridepublic Executor getExecutor(URL url) {String name url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));int threads url.getParameter(THREADS_KEY, DEFAULT_THREADS);int queues url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,queues 0 ? new SynchronousQueueRunnable() :(queues 0 ? new LinkedBlockingQueueRunnable(): new LinkedBlockingQueueRunnable(queues)),new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));}
}
LimitedThreadPool可伸缩线程池 虽然叫可伸缩线程池但是实际上只能伸不能缩官网上说是为了突然大量的流量引起性能问题具体实现就是将keepAliveTime设置成无限大这样当队列满了后就会创建线程达到maximumPoolSize新创建的这些线程因为keepAliveTime设置成无限大所以也不会销毁了。
public class LimitedThreadPool implements ThreadPool {Overridepublic Executor getExecutor(URL url) {String name url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));int cores url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);int threads url.getParameter(THREADS_KEY, DEFAULT_THREADS);int queues url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,queues 0 ? new SynchronousQueueRunnable() :(queues 0 ? new LinkedBlockingQueueRunnable(): new LinkedBlockingQueueRunnable(queues)),new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));}}EagerThreadPool Eager单词是渴望的热切地的意思这个线程池所实现的逻辑是当任务数超过corePoolSize但小于maximumPoolSize时不是将新任务放到队列中而是优先创建新的worker线程当线程数已经达到maximumPoolSize接下来新的任务才会放到阻塞队列中阻塞队列满了会抛出RejectedExecutionException。 EagerThreadPool线程池就不是通过JUC的ThreadPoolExecutor实现的了而是继承ThreadPoolExecutor自己实现一些逻辑下面一步一步看。
EagerThreadPool Dubbo自己实现了阻塞队列TaskQueue和线程池EagerThreadPoolExecutor从EagerThreadPool的代码中看不到该类型线程池的核心逻辑核心逻辑是在TaskQueue代码中这里跳过直接看TaskQueue代码。
public class EagerThreadPool implements ThreadPool {Overridepublic Executor getExecutor(URL url) {String name url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));int cores url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);int threads url.getParameter(THREADS_KEY, Integer.MAX_VALUE);int queues url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);int alive url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);// init queue and executorTaskQueueRunnable taskQueue new TaskQueueRunnable(queues 0 ? 1 : queues);EagerThreadPoolExecutor executor new EagerThreadPoolExecutor(cores,threads,alive,TimeUnit.MILLISECONDS,taskQueue,new NamedInternalThreadFactory(name, true),new AbortPolicyWithReport(name, url));taskQueue.setExecutor(executor);return executor;}
}TaskQueue Dubbo的EagerThreadPool是通过TaskQueue的offer方法实现的逻辑就是当提交到线程池任务时如果任务数大于corePoolSize会将任务offer到TaskQueue中这时如果活跃的线程数大于等于线程池大小并且当前线程数小于maximumPoolSize时就会伪装成放入到队列失败这时线程池就会创建线程从而实现超过corePoolSize不超过maximumPoolSize时创建worker线程而不是将任务放入到队列中。
public class TaskQueueR extends Runnable extends LinkedBlockingQueueRunnable {private static final long serialVersionUID -2635853580887179627L;private EagerThreadPoolExecutor executor;public TaskQueue(int capacity) {super(capacity);}public void setExecutor(EagerThreadPoolExecutor exec) {executor exec;}Overridepublic boolean offer(Runnable runnable) {if (executor null) {throw new RejectedExecutionException(The task queue does not have executor!);}int currentPoolThreadSize executor.getPoolSize();// have free worker. put task into queue to let the worker deal with task.if (executor.getActiveCount() currentPoolThreadSize) {return super.offer(runnable);}// 伪装放入队列失败让线程池创建线程if (currentPoolThreadSize executor.getMaximumPoolSize()) {return false;}// currentPoolThreadSize maxreturn super.offer(runnable);}/*** retry offer task** param o task* return offer success or not* throws RejectedExecutionException if executor is terminated.*/public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if (executor.isShutdown()) {throw new RejectedExecutionException(Executor is shutdown!);}return super.offer(o, timeout, unit);}
}
EagerThreadPoolExecutor 当任务数大于maximumPoolSize时线程池会抛出RejectedExecutionExceptionEagerThreadPoolExecutor捕获这个异常并且调用TaskQueue的retryOffer方法尝试放入队列这样就实现了当线程数已经达到maximumPoolSize接下来新的任务才会放到阻塞队列中阻塞队列满了会抛出RejectedExecutionException代码如下 public class EagerThreadPoolExecutor extends ThreadPoolExecutor {public EagerThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, TaskQueueRunnable workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);}Overridepublic void execute(Runnable command) {if (command null) {throw new NullPointerException();}try {super.execute(command);} catch (RejectedExecutionException rx) {// 重新尝试将任务放到队列中.final TaskQueue queue (TaskQueue) super.getQueue();try {if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {throw new RejectedExecutionException(Queue capacity is full., rx);}} catch (InterruptedException x) {throw new RejectedExecutionException(x);}}}
}总结 Dubbo实现了自定义线程池其核心接口是ThreadPool该接口是SPI的默认的实现是fixedDubbo提供了四种实现分别是CachedThreadPool、FixedThreadPool、LimitedThreadPool、EagerThreadPool。