石家庄网站建设机构,做展示类网站,刷网站流量有用吗,全球速卖通一、corePoolSize核心线程数
二、maximunPoolSize最大线程数
三、keepAliveTime空闲线程存活时间
四、unit空闲线程存活时间的单位
五、workQueue线程工作队列
1、ArrayBlockingQueue FIFO有界阻塞队列
2、LinkedBlockingQueue FIFO无限队列
3、PriorityBlockingQueue V…
一、corePoolSize核心线程数
二、maximunPoolSize最大线程数
三、keepAliveTime空闲线程存活时间
四、unit空闲线程存活时间的单位
五、workQueue线程工作队列
1、ArrayBlockingQueue FIFO有界阻塞队列
2、LinkedBlockingQueue FIFO无限队列
3、PriorityBlockingQueue VIP
4、SynchronousQueue不缓存任务的阻塞队列
六、threadFactory线程工厂
七、handler超出线程数和工作队列时候的任务请求处理策略
策略1ThreadPoolExecutor.AbortPolicy默认拒绝执行
策略2ThreadPoolExecutor.CallerRunsPolicy调用 execute 方法的线程本身运行任务
策略3ThreadPoolExecutor.DiscardOldestPolicy执行程序未关闭则删除工作队列头部的任务
策略4ThreadPoolExecutor.DiscardPolicy无法执行的任务被简单地删除
八 ThreadPoolExecutor线程池参数设置技巧
1、ThreadPoolExecutor的重要参数
2、ThreadPoolExecutor执行顺序
3、如何设置参数
九 真实环境实践
十 个人总结 JDK1.8线程池参数源代码 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueueRunnable workQueueRejectedExecutionHandler handler) this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueueExecutors.defaultThreadFactory(), handler);}一、corePoolSize核心线程数
指的是核心线程大小线程池中维护一个最小的线程数量即使这些线程处于空闲状态也一直存在池中除非设置了核心线程超时时间。
这也是源码中的注释说明。
/** param corePoolSize the number of threads to keep in the pool,
* even if they are idle, unless {code allowCoreThreadTimeOut} is set.
*/ 二、maximunPoolSize最大线程数
指的是线程池中允许的最大线程数量。当线程池中核心线程都处理执行状态有新请求的任务 1、工作队列未满新请求的任务加入工作队列 2、工作队列已满线程池会创建新线程来执行这个任务。当然创建新线程不是无限制的因为会受到maximumPoolSize最大线程数量的限制。 三、keepAliveTime空闲线程存活时间
指的是空闲线程存活时间。具体说当线程数大于核心线程数时空闲线程在等待新任务到达的最大时间如果超过这个时间还没有任务请求该空闲线程就会被销毁。
可见官方注释
/** param keepAliveTime when the number of threads is greater than* the core, this is the maximum time that excess idle threads* will wait for new tasks before terminating.
*/四、unit空闲线程存活时间的单位
是指空闲线程存活时间的单位。keepAliveTime的计量单位。枚举类型TimeUnit类。 五、workQueue线程工作队列
1、ArrayBlockingQueue FIFO有界阻塞队列 基于数组的有界阻塞队列特点FIFO先进先出。 当线程池中已经存在最大数量的线程时候再请求新的任务这时就会将任务加入工作队列的队尾一旦有空闲线程就会取出队头执行任务。因为是基于数组的有界阻塞队列所以可以避免系统资源的耗尽。 那么如果出现有界队列已满最大数量的所有线程都处于执行状态这时又有新的任务请求怎么办呢 这时候会采用Handler拒绝策略对请求的任务进行处理。后面会详细介绍。 2、LinkedBlockingQueue FIFO无限队列 基于链表的无界阻塞队列默认最大容量Integer.MAX_VALUE( )可认为是无限队列特点FIFO。 关于maximumPoolSize参数在工作队列为LinkedBlockingQueue时候是否起作用这个问题我们需要视情况而定 情况①如果指定了工作队列大小比如core2max3workQueue2任务数task5这种情况的最大线程数量的限制是有效的。 情况②如果工作队列大小默认这时maximumPoolSize不起作用因为新请求的任务一直可以加到队列中。 3、PriorityBlockingQueue VIP 优先级无界阻塞队列前面两种工作队列特点都是FIFO而优先级阻塞队列可以通过参数Comparator实现对任务进行排序不按照FIFO执行。 4、SynchronousQueue不缓存任务的阻塞队列 不缓存任务的阻塞队列它实际上不是真正的队列因为它没有提供存储任务的空间。生产者一个任务请求到来会直接执行也就是说这种队列在消费者充足的情况下更加适合。因为这种队列没有存储能力所以只有当另一个线程消费者准备好工作put入队和take出队方法才不会是阻塞状态。 以上四种工作队列跟线程池结合就是一种生产者-消费者 设计模式。生产者把新任务加入工作队列消费者从队列取出任务消费BlockingQueue可以使用任意数量的生产者和消费者这样实现了解耦简化了设计。
六、threadFactory线程工厂 线程工厂创建一个新线程时使用的工厂可以用来设定线程名、是否为daemon线程等。 守护线程(Daemon Thread) 在Java中有两类线程用户线程 (User Thread)、守护线程 (DaemonThread)。 一般建议自定义线程工厂构建线程的时候设置线程的名称这样就在查日志的时候就方便知道是哪个线程执行的代码。 官方使用默认的线程工厂源码如下 /*** The default thread factory*/static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber new AtomicInteger(1);private final String namePrefix;DefaultThreadFactory() {SecurityManager s System.getSecurityManager();group (s ! null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix pool- poolNumber.getAndIncrement() -thread-;}public Thread newThread(Runnable r) {Thread t new Thread(group, r,namePrefix threadNumber.getAndIncrement(),0);if (t.isDaemon()) t.setDaemon(false);if (t.getPriority() ! Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t}}七、handler超出线程数和工作队列时候的任务请求处理策略
Java 并发超出线程数和工作队列时候的任务请求处理策略使用了策略设计模式。
策略1ThreadPoolExecutor.AbortPolicy默认拒绝执行
在默认的处理策略。该处理在拒绝时抛出RejectedExecutionException拒绝执行。 public static class AbortPolicy implements RejectedExecutionHandler {/*** Creates an {code AbortPolicy}.*/public AbortPolicy() { }/** * Always throws RejectedExecutionException.** param r the runnable task requested to be executed* param e the executor attempting to execute this task* throws RejectedExecutionException always*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException(Task r.toString() rejected from e.toString());}}策略2ThreadPoolExecutor.CallerRunsPolicy调用 execute 方法的线程本身运行任务
调用 execute 方法的线程本身运行任务。这提供了一个简单的反馈控制机制可以降低新任务提交的速度。 public static class CallerRunsPolicy implements RejectedExecutionHandler {/*** Creates a {code CallerRunsPolicy}.*/public CallerRunsPolicy() { }/*** Executes task r in the callers thread, unless the executor* has been shut down, in which case the task is discarded.** param r the runnable task requested to be executed* param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e)if (!e.isShutdown()) {r.run();}}}策略3ThreadPoolExecutor.DiscardOldestPolicy执行程序未关闭则删除工作队列头部的任务 public static class DiscardOldestPolicy implements RejectedExecutionHandler {/*** Creates a {code DiscardOldestPolicy} for the given executor.*/public DiscardOldestPolicy() { }/*** Obtains and ignores the next task that the executor * * would otherwise execute, if one is immediately available,* and then retries execution of task r, unless the executor* is shut down, in which case task r is instead discarded. ** param r the runnable task requested to be executed* param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);} }如果执行程序未关闭则删除工作队列头部的任务然后重试执行(可能再次失败导致重复执行)。
策略4ThreadPoolExecutor.DiscardPolicy无法执行的任务被简单地删除
无法执行的任务被简单地删除将会丢弃当前任务通过源码可以看出该策略不会执行任务操作。 public static class DiscardPolicy implements RejectedExecutionHandler {/*** Creates a {code DiscardPolicy}.*/public DiscardPolicy() { }/*** Does nothing, which has the effect of discarding task r.** param r the runnable task requested to be executed* param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {} }八 ThreadPoolExecutor线程池参数设置技巧
1、ThreadPoolExecutor的重要参数
corePoolSize核心线程数 核心线程会一直存活及时没有任务需要执行当线程数小于核心线程数时即使有线程空闲线程池也会优先创建新线程处理设置allowCoreThreadTimeouttrue默认false时核心线程会超时关闭 queueCapacity任务队列容量阻塞队列 当核心线程数达到最大时新任务会放在队列中排队等待执行 maxPoolSize最大线程数 当线程数corePoolSize且任务队列已满时。线程池会创建新线程来处理任务 当线程数maxPoolSize且任务队列已满时线程池会拒绝处理任务而抛出异常 keepAliveTime线程空闲时间 当线程空闲时间达到keepAliveTime时线程会退出直到线程数量corePoolSize 如果allowCoreThreadTimeouttrue则会直到线程数量0 allowCoreThreadTimeout允许核心线程超时rejectedExecutionHandler任务拒绝处理器 两种情况会拒绝处理任务 当线程数已经达到maxPoolSize切队列已满会拒绝新任务 当线程池被调用shutdown()后会等待线程池里的任务执行完毕再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务会拒绝新任务 线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是AbortPolicy会抛出异常 ThreadPoolExecutor类有几个内部实现类来处理这类情况 AbortPolicy 丢弃任务抛运行时异常 CallerRunsPolicy 执行任务 DiscardPolicy 忽视什么都不会发生 DiscardOldestPolicy 从队列中踢出最先进入队列最后一个执行的任务 实现RejectedExecutionHandler接口可自定义处理器
2、ThreadPoolExecutor执行顺序
线程池按以下行为执行任务 当线程数小于核心线程数时创建线程。当线程数大于等于核心线程数且任务队列未满时将任务放入任务队列。当线程数大于等于核心线程数且任务队列已满若线程数小于最大线程数创建线程若线程数等于最大线程数抛出异常拒绝任务 3、如何设置参数
默认值 corePoolSize1queueCapacityInteger.MAX_VALUEmaxPoolSizeInteger.MAX_VALUEkeepAliveTime60sallowCoreThreadTimeoutfalserejectedExecutionHandlerAbortPolicy() 如何来设置
需要根据几个值来决定 tasks 每秒的任务数假设为500~1000 taskcost每个任务花费时间假设为0.1s responsetime系统允许容忍的最大响应时间假设为1s 做几个计算 corePoolSize 每秒需要多少个线程处理 threadcount tasks/(1/taskcost) tasks*taskcout (500~1000)*0.1 50~100 个线程。corePoolSize设置应该大于50 根据8020原则如果80%的每秒任务数小于800那么corePoolSize设置为80即可 queueCapacity (coreSizePool/taskcost)*responsetime 计算可得 queueCapacity 80/0.1*1 80。意思是队列里的线程可以等待1s超过了的需要新开线程来执行 切记不能设置为Integer.MAX_VALUE这样队列会很大线程数只会保持在corePoolSize大小当任务陡增时不能新开线程来执行响应时间会随之陡增。 maxPoolSize (max(tasks)- queueCapacity)/(1/taskcost) 计算可得 maxPoolSize (1000-80)/10 92 最大任务数-队列容量/每个线程每秒处理能力 最大线程数 rejectedExecutionHandler 根据具体情况来决定任务不重要可丢弃任务重要则要利用一些缓冲机制来处理 keepAliveTime和allowCoreThreadTimeout 采用默认通常能满足 以上都是理想值实际情况下要根据机器性能来决定。如果在未达到最大线程数的情况机器cpu load已经满了则需要通过升级硬件呵呵和优化代码降低taskcost来处理。
九 真实环境实践 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;/*** 线程池配置** author oldlu**/
Configuration
public class ThreadPoolConfig {// 核心线程池大小Value(${threadPoolConfig.corePoolSize})private int corePoolSize;// 最大可创建的线程数Value(${threadPoolConfig.maxPoolSize})private int maxPoolSize;// 队列最大长度Value(${threadPoolConfig.queueCapacity})private int queueCapacity;// 线程池维护线程所允许的空闲时间Value(${threadPoolConfig.keepAliveSeconds})private int keepAliveSeconds;// 线程池对拒绝任务(无线程可用)的处理策略Value(${threadPoolConfig.rejectedExecutionHandler})private String rejectedExecutionHandler;Bean(name threadPoolTaskExecutor)ConditionalOnProperty(prefix threadPoolTaskExecutor, name enabled, havingValue true)public ThreadPoolTaskExecutor threadPoolTaskExecutor() {ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();executor.setMaxPoolSize(maxPoolSize);executor.setCorePoolSize(corePoolSize);executor.setQueueCapacity(queueCapacity);executor.setKeepAliveSeconds(keepAliveSeconds);RejectedExecutionHandler handler;if (rejectedExecutionHandler.equals(CallerRunsPolicy)) {handler new ThreadPoolExecutor.CallerRunsPolicy();} else if (rejectedExecutionHandler.equals(DiscardOldestPolicy)) {handler new ThreadPoolExecutor.DiscardOldestPolicy();} else if (rejectedExecutionHandler.equals(DiscardPolicy)) {handler new ThreadPoolExecutor.DiscardPolicy();} else {handler new ThreadPoolExecutor.AbortPolicy();}executor.setRejectedExecutionHandler(handler);return executor;}/*** 执行周期性或定时任务*/Bean(name scheduledExecutorService)protected ScheduledExecutorService scheduledExecutorService() {return new ScheduledThreadPoolExecutor(corePoolSize,new BasicThreadFactory.Builder().namingPattern(schedule-pool-%d).daemon(true).build()) {Overrideprotected void afterExecute(Runnable r, Throwable t) {super.afterExecute(r, t);Threads.printException(r, t);}};}
}
十 个人总结
Java-如何合理的设置线程池大小 想要合理配置线程池线程数的大小需要分析任务的类型任务类型不同线程池大小配置也不同。
配置线程池的大小可根据以下几个维度进行分析来配置合理的线程数
任务性质可分为CPU密集型任务IO密集型任务混合型任务。 任务的执行时长。 任务是否有依赖——依赖其他系统资源如数据库连接等。
CPU密集型任务普通计算 尽量使用较小的线程池一般为CPU核心数1。 因为CPU密集型任务使得CPU使用率很高若开过多的线程数只能增加上下文切换的次数因此会带来额外的开销。
IO密集型任务文件数据库操作 可以使用稍大的线程池一般为2*CPU核心数1。 因为IO操作不占用CPU不要让CPU闲下来应加大线程数量因此可以让CPU在等待IO的时候去处理别的任务充分利用CPU时间。
混合型任务 可以将任务分成IO密集型和CPU密集型任务然后分别用不同的线程池去处理。 只要分完之后两个任务的执行时间相差不大那么就会比串行执行来的高效。 因为如果划分之后两个任务执行时间相差甚远那么先执行完的任务就要等后执行完的任务最终的时间仍然取决于后执行完的任务而且还要加上任务拆分与合并的开销得不偿失
依赖其他资源 如某个任务依赖数据库的连接返回的结果这时候等待的时间越长则CPU空闲的时间越长那么线程数量应设置得越大才能更好的利用CPU。
借鉴别人的文章 对线程池大小的估算公式 最佳线程数目 线程等待时间线程CPU时间/线程CPU时间 * CPU数目比如平均每个线程CPU运行时间为0.5s而线程等待时间非CPU运行时间比如IO为1.5sCPU核心数为8那么根据上面这个公式估算得到((0.51.5)/0.5)*832。
可以得出一个结论
线程等待时间所占比例越高需要越多线程。线程CPU时间所占比例越高需要越少线程。