当前位置: 首页 > news >正文

wordpress建立个人网站ps免费模板网站

wordpress建立个人网站,ps免费模板网站,网站显示图片标记,wordpress公众号文章分类文章目录 Java 线程池概述ThreadPoolExecutor 构造方法线程池拒绝策略工作流程并发库中的线程池CachedThreadPoolFixedThreadPoolSingleThreadExecutorScheduledThreadPool ThreadPoolExecutor 源码分析线程池状态表示获取 runState获取 workerCount生成 ctl 提交任务 execute(… 文章目录 Java 线程池概述ThreadPoolExecutor 构造方法线程池拒绝策略工作流程并发库中的线程池CachedThreadPoolFixedThreadPoolSingleThreadExecutorScheduledThreadPool ThreadPoolExecutor 源码分析线程池状态表示获取 runState获取 workerCount生成 ctl 提交任务 execute()为什么需要二次检查创建工作线程 addWorker() 工作线程 Worker主逻辑 runWorker获取任务 getTask() 工作线程的退出RUNNING 状态所有任务执行完成shutdown 关闭线程池所有线程等待新任务所有线程繁忙队列中剩余少量的任务 写在最后 Java 线程池概述 Java 语言中创建线程看上去就像创建一个对象仅仅调用 new Thread() 即可但实际上创建线程比创建对象复杂得多。创建对象仅仅是在 JVM 的堆里分配一块内存而创建线程需要 调用操作系统内核的 API然后操作系统要为线程分配一系列的资源这个成本很高。因此线程是一个重量级的对象应该避免频繁的创建和销毁。 线程池没有采用一般的池化资源设计方法例如连接池、对象池因为我们无法获取一个启动的 Thread 对象然后动态地将需要执行的任务 Runnable task 提交给线程执行。目前业界地线程池设计普遍采用生产者-消费者模型线程池的使用方为 Producer而线程池中的 工作线程为 Consumer。 ThreadPoolExecutor 构造方法 Java 提供的线程池相关的工具类中最核心的是ThreadPoolExecutor。ThreadPoolExecutor 的构造函数如下 corePoolSize线程池保有的最小线程数核心线程数。如果线程池中的线程数小于 corePoolSize提交任务时会创建一个核心线程该任务作为新创建的核心线程第一个执行的任务。maximumPoolSize最大线程数。如果提交任务时任务队列已经满了且当前工作线程数小于 maximumPoolSize会创建新的工作线程用于执行该任务反之如果工作线程数大于等于 maximumPoolSize则执行拒绝策略。keepAliveTime unit一个线程如果在时间单位为 unit 的 keepAliveTime 时间内没有执行任务而且线程池的线程数大于 corePoolSize 那么这个空闲的线程就要被回收。workQueue任务队列为 BlockingQueue 实现类。threadFactory线程工厂通过该参数可以自定义如何创建线程。ThreadFactory 是一个接口里面是有一个 newThread 方法等待实现Thread newThread(Runnable r);//接口方法默认为public abstracthandler任务的拒绝策略如果线程池中 所有的线程都在忙碌且任务队列已经满了前提是任务队列是有界队列此时提交任务线程池会拒绝执行。决绝的策略可以通过该参数指定。 温馨提示线程池的静态工厂类 Executors 提供了很多开箱即用的线程池可以帮助快速创建线程池但提供的线程池很多使用的是 无界队列 LinkedBlockingQueue无界队列很容易导致 OOM而 OOM 会导致所有请求都无法处理。 在阅读完本节后我们知道在生产环境中使用线程池时需要设置 ThreadPoolExecutor 构造方法的 workQueue 参数为 ArrayBlockingQueue 等有界阻塞队列。 线程池拒绝策略 上一小节提到构造方法中的 RejectedExecutionHandler handler 参数可以用于自定义任务拒绝策略。ThreadPoolExecutor 已经提供了 4 种拒绝策略 CallerRunsPolicy提交任务的线程自己去执行该任务。 AbortPolicy默认的拒绝策略会抛出 RejectedExecutionException。 DiscardPolicy直接丢弃任务没有任何异常抛出。 DiscardOldestPolicy丢弃最老的任务其实就是把最早进入工作队列的任务丢弃然后把新任务加入到工作队列。 默认拒绝策略为 AbortPolicy该拒绝策略抛出 RejectedExecutionException 为运行时异常编译器不会强制 catch开发人员可能会忽略因此默认拒绝策略要慎重使用。 如果线程池处理的任务非常重要建议自定义自己的拒绝策略并且在实际工作中 自定义的拒绝策略往往和 降级策略 配合使用。 例如将任务信息插入数据库或者消息队列配置 XXL-JOB 定时任务扫描失败任务表将执行失败的任务交给专用于补偿的线程池去进行补偿。 工作流程 线程池中有几个重要的概念核心线程池CorePool、**空闲线程池IdlePool**以及 任务队列。下图为我绘制的线程池工作流程图包含上述三个概念模型cpSize 核心线程池中当前的线程数、cpCap 核心线程池容量、ipSize 空闲线程池中当前线程数。 我来简述下提交任务 task 时线程池的执行流程 如果核心线程池未满即 cpSize 小于 cpCap通过线程工厂 创建一个核心线程将 task 作为新线程的第一个任务。 如果 核心线程池已满但是任务队列仍然有空间将 task 添加到任务队列。核心线程在执行完手头的任务后会从任务队列中获取新的任务继续执行。如果任务队列为空核心线程会阻塞在任务获取阶段直到有 新的任务提交到任务队列。 如果任务队列已满则创建空闲线程并将 task 作为第一个执行的任务。空闲线程如果执行完手头的任务也会从任务队列中获取新的任务。 如果任务队列为空空闲线程会阻塞直到 超出 keepalive 设定的时间 或 获取到新的任务执行。如果等待新任务超时空闲线程的生命周期就会结束了。 如果空闲线程数核心线程数已经达到了 maximumPoolSize创建新线程的方法会失败此时提交的任务将被拒绝拒绝策略由 RejectedHandler 负责执行。 并发库中的线程池 java.util.concurrent.Executors 提供了通用线程池创建方法去创建不同配置的线程池该工具类目前提供了五种不同的线程池创建配置 CachedThreadPool CachedThreadPool 是一种用来 处理大量短时间工作任务的线程池会在先前构建的线程可用时重用已创建的工作线程但是当工作线程空闲超过 60s则会从线程池中移除。 任务队列为 SynchronousQueue它是一个不存储元素的阻塞队列容量 0提交任务的操作必须等待工作线程的移除操作反之亦然。 为什么使用 SynchronousQueue 作为任务队列 个人想法线程池的工作逻辑是提交任务时如果 核心线程数达到 corePoolSize 且任务队列已满则会创建空闲线程执行。因为 SynchronousQueue 容量为 0 天然是满的且 corePoolSize 被设置为 0这意味着创建任务时如果没有可用线程就会立即创建一个新线程来处理任务。 这使得 CachedThreadPool 在执行大量短期异步任务时更加高效避免了任务对线程资源的等待符合设计初衷快速执行大量的短暂任务。 FixedThreadPool 核心线程数和最大线程数相等使用的是 无界任务队列 LinkedBlockingQueue。如果当前的工作线程数已经达到 nThreads任务将被添加到任务队列中等待执行。如果有工作线程退出下一次提交任务时将会有新的工作线程被创建来补足线程池。 SingleThreadExecutor 工作线程限制为 1 的 FixedThreadExecutor它 保证了所有任务的都是被顺序执行。 ScheduledThreadPool ScheduledThreadPoolExecutor 允许安排一个任务在延迟指定时间后 执行还可以 周期性地执行任务。周期性调度任务有两种类型固定延迟和固定频率。固定延迟 是在上一个任务结束和下一个任务开始之间保持固定的延迟而 固定频率 是以固定的频率执行任务不管任务的执行时间多长。 ScheduledThreadPoolExecutor 中定义了内部类 DelayedWorkQueue 作为任务队列DelayedWorkQueue 是基于堆的数据结构。队列中的元素为 RunnableScheduledFuture 类型 private RunnableScheduledFuture?[] queue new RunnableScheduledFuture?[INITIAL_CAPACITY];RunnableScheduledFuture 接口继承关系如下图所示 Delayed 接口继承了 Comparable 接口getDelay 方法返回任务剩余的延迟时间返回值小于等于 0 说明延迟的时间已过compareTo 方法用于比较任务下一次的执行时间用于维护小顶堆属性父节点任务的执行时间小于儿子节点。RunnableFuture 接口的 run 方法定义了需要执行的任务逻辑Future 接口用于获取异步任务的执行结果。 DelayedWorkQueue#take 方法用于获取下一个需要执行的定时任务代码及详细注释如下 public RunnableScheduledFuture? take() throws InterruptedException {final ReentrantLock lock this.lock;// 上锁, 避免堆数据访问产生的数据竞争lock.lockInterruptibly();try {for (;;) {// 堆顶元素: Delayed#getDelay 延迟时间最小的任务RunnableScheduledFuture? first queue[0];if (first null)// 堆中任务空, 等待新任务入堆available.await();else {long delay first.getDelay(NANOSECONDS);if (delay 0)// delay小于等于0, 说明延迟时间已过, 可以执行;// finishPoll 弹出堆顶任务return finishPoll(first);first null; // dont retain ref while waitingif (leader ! null)// leader为等待堆顶任务到达执行时间的线程// leader 非空说明已经有线程正在等待堆顶任务可执行, 因此当前线程为 follower, 需要等待直到堆顶元素变更available.await();else {// 当前线程是等待堆顶元素的 leader 线程, 设置 leader 属性Thread thisThread Thread.currentThread();leader thisThread;try {// 等待任务延迟的时间available.awaitNanos(delay);} finally {// await期间会释放锁, leader可能因为新任务的加入而失效(当前线程可能等待的不再是堆顶任务)// 所以await超时后, 需要判断leader是否为当前线程, 为当前线程才能设为nullif (leader thisThread)leader null;}}}}} finally {// 任务队列非空, leader为空说明没有线程等待堆顶元素可执行, 此时唤醒 follower 线程, 尝试获取堆顶的任务if (leader null queue[0] ! null)available.signal();lock.unlock();} }ThreadPoolExecutor 源码分析 线程池状态表示 ThreadPoolExecutor 最重要的状态参数为线程池状态(rs) 以及 活跃线程数(wc)。ThreadPoolExecutor 使用一个 Integer 变量 ctl 存储这两个状态参数 private final AtomicInteger ctl new AtomicInteger(ctlOf(RUNNING, 0));Integer 长度位 32 bitsctl 中最高的三位 (29-31) 存储线程池状态低 29 位 (0~28) 存储活跃线程数因此线程池中活跃线程数理论上限为 2 29 − 1 2^{29}-1 229−1。 了解了 ThreadPoolExecutor 的这种设计之后我们来看看状态相关的位运算 private static final int COUNT_BITS Integer.SIZE - 3; private static final int CAPACITY (1 COUNT_BITS) - 1;// runState is stored in the high-order bits private static final int RUNNING -1 COUNT_BITS; private static final int SHUTDOWN 0 COUNT_BITS; private static final int STOP 1 COUNT_BITS; private static final int TIDYING 2 COUNT_BITS; private static final int TERMINATED 3 COUNT_BITS;CAPACITY 表示线程池中活跃线程的理论上限 2 29 − 1 2^{29}-1 229−1COUNT_BITS 表示线程数位数 32 − 3 29 32-329 32−329。 RUNNING 、SHUTDOWN、STOP、TIDYING、TERMINATED 为线程池的五种状态。根据代码这五种状态表示如下图所示 RUNNING可接收新的任务并且处理队列中排队的任务SHUTDOWN不接收新的任务但会处理队列中剩下的任务STOP不接收新任务不处理队列中的任务并且中断进行中的任务TIDYING所有的任务终止工作线程数 (workerCount) 等于 0TERMINATED线程池关闭terminated() 方法完成。 获取 runState private static int runStateOf(int c) { return c ~CAPACITY; }runStateOf 方法从 ctl 获取线程池运行状态保留 ctl 的最高的三位其余位设置为 0。以 STOP 状态、3 个活跃线程数的 ctl 为例求 rs 的过程如下 获取 workerCount private static int workerCountOf(int c) { return c CAPACITY; }workerCountOf 获取线程池中的活跃线程数即保留 ctl 的 0-28 位将 29-31 位设置为 0。 生成 ctl private static int ctlOf(int rs, int wc) { return rs | wc; }ctlOf 通过状态值和线程数值计算出 ctl就是对 rs 和 wc 进行或运算保留 wc 的 0-28 位和 rs 的 29-31 位。 提交任务 execute() ThreadPoolExecutor#execute 方法用于提交任务给线程池执行代码以及详细注释如下 public void execute(Runnable command) {if (command null)throw new NullPointerException();// 获取线程池状态参数 ctlint c ctl.get();// 如果活跃线程数小于核心线程池容量corePoolSize, addWorker创建新线程, 以command作为第一个任务if (workerCountOf(c) corePoolSize) {if (addWorker(command, true))return;// 创建新线程失败, 更新 ctlc ctl.get();}// 创建核心线程失败, 尝试将任务添加到任务队列中if (isRunning(c) workQueue.offer(command)) {int recheck ctl.get();// 二次检查, 如果线程池不在运行状态, 需要回滚刚刚入队的任务if (!isRunning(recheck) remove(command))// 移除任务成功, 执行拒绝策略reject(command);else if (workerCountOf(recheck) 0)// 线程池为运行状态, 但是没有工作线程, 创建线程处理任务队列中的任务addWorker(null, false);}// 任务添加到队列失败, (1)线程池状态不是RUNNING状态 或 (2)任务队列已满// 尝试增加非核心线程, 执行 command 任务, 如果线程池不为RUNNING, addWorker会返回falseelse if (!addWorker(command, false))// 线程池不为RUNNING, 新增非核心线程失败, 执行任务拒绝策略reject(command);}这段代码的主要逻辑很简洁 当 wc 小于 corePoolSize 时创建核心线程执行 command 任务如果核心线程数已满则将任务缓存在任务队列中 (workQueue.offer)工作线程完成手头上的任务后从任务队列中获取新任务。如果任务队列也满了offer 方法返回 false尝试增加非核心线程执行 command。如果线程创建失败reject 执行任务拒绝策略。 除此之外我想在本篇博客中探讨下 execute 方法的一些实现细节并给出我自己的观点用于抛砖引玉。 为什么需要二次检查 大家请看下面这段有关二次检查的代码在阅读源码时我产生了疑问 为什么需要二次检查 该操作 解决了什么场景下的数据竞争 // ... c ctl.get(); // -------(1) if (isRunning(c) workQueue.offer(command)) { // -------(2)int recheck ctl.get(); // -------(3)if (!isRunning(recheck) remove(command))reject(command);else if (workerCountOf(recheck) 0)addWorker(null, false); } // ...执行二次检查的前提是 线程池在执行语句 (1) 的时候是运行状态任务队列未满command 被添加至队列(2) 处的 offer 方法返回 true 假设没有二次检查会发生什么 场景 1在语句 (1) 后语句(2) 执行前线程池使用者调用了 shutdownNow 方法将线程池工作线程关闭清空任务队列中的任务。时序图如下 我们先来看看 shutdownNow 调用的 drainQueue 方法 public ListRunnable shutdownNow() {ListRunnable tasks;final ReentrantLock mainLock this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(STOP);interruptWorkers();tasks drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks; }private ListRunnable drainQueue() {BlockingQueueRunnable q workQueue;ArrayListRunnable taskList new ArrayListRunnable();q.drainTo(taskList);if (!q.isEmpty()) {for (Runnable r : q.toArray(new Runnable[0])) {if (q.remove(r))taskList.add(r);}}return taskList; }drainQueue 方法用于移除任务队列 workQueue 中的 Runnable 任务这些未执行的任务作为 shutdownNow 方法的返回值通知方法调用者哪些任务未执行。 如果按照上图中的执行序列线程池的状态已经为 STOP任务队列也被清空但是新提交的任务 command 却被添加到任务队列中。这导致这个新任务不会被运行、也不会执行拒绝策略、也无法通过 shutdownNow 返回的任务列表通知调用者。 这严重降低了线程池的健壮性难以想象一个已提交的任务消失在线程池中 场景 2线程池处于运行状态corePoolSize 设置为 0阻塞队列的容量大于 0。 线程池刚启动时提交任务 command 显然无法创建核心线程执行任务会被缓冲在任务队列中直到任务队列容量到达上限线程池才会创建非核心线程执行任务。这导致 大量任务将不能及时被处理甚至可能永远得不到执行 场景示意图如下图中任务队列容量为 4corePoolSize 等于 0 二次检查解决了上述两种场景的问题吗当然 c ctl.get(); // -------(1) if (isRunning(c) workQueue.offer(command)) { // -------(2)int recheck ctl.get(); // -------(3)if (!isRunning(recheck) remove(command))reject(command);else if (workerCountOf(recheck) 0) // -----(4)addWorker(null, false); }针对场景 1 如果在语句 (1) 和 语句(2) 之间shutdownNow 被调用并执行完成然后语句 (2) 将新任务 command 加入任务队列。在语句 (3) 重新获取最新的 ctl此时就能得知线程池的状态已经为 STOP使用 remove 方法回滚入队的任务并执行 reject 方法拒绝执行任务 。 针对场景 2 如果线程池状态为 RUNNING但因为线程中没有线程语句(4) 判断为 true创建非核心线程处理任务队列中的任务防止异步任务长时间处于队列中得不到处理 的情况。 创建工作线程 addWorker() addWorker 用于创建工作线程我将其分为两部分分析 第一部分根据外层死循环判断 ThreadPoolExecutor 的运行状态 是否能够创建线程。如果可以创建线程通过内层死循环 CAS 更新状态参数 ctl直到更新成功或线程池状态发生改变。 第一部分的含详细注释的代码如下 private boolean addWorker(Runnable firstTask, boolean core) {// retry为外层循环retry:for (;;) {int c ctl.get();int rs runStateOf(c);// 仅 (1) RUNNING状态 或 (2) SHUTDOWN状态队列中仍有任务firstTask为空 时 创建工作线程// firstTask为空, 说明活跃线程数不满足线程池运行的最小数量if (rs SHUTDOWN ! (rs SHUTDOWN firstTask null ! workQueue.isEmpty()))return false;// for内层循环for (;;) {int wc workerCountOf(c);// 如果线程数达到容量上限, 不可创建新线程// 如果core为true, 线程数大于等于corePoolSize, 不能创建核心线程// 如果 core 为 false, 线程数大于等于 maximumPoolSize, 不可以创建非核心线程if (wc CAPACITY ||wc (core ? corePoolSize : maximumPoolSize))return false;// CAS更新 ctl, 如果成功, 则退出 retry 循环, 执行创建流程if (compareAndIncrementWorkerCount(c))break retry;// CAS更新失败, 重新读取 ctlc ctl.get();if (runStateOf(c) ! rs)// 状态发生改变, 重新执行大循环continue retry;// else: 线程数改变导致CAS失败, 继续for循环即可}}// ... 省略第二部分 } 第二部分状态更新成功后执行真正的线程创建逻辑包括工作线程添加至 Worker 集合、启动 Thread 对象。 第二部分详细代码注释如下 private boolean addWorker(Runnable firstTask, boolean core) {// ... 省略第一部分boolean workerStarted false;boolean workerAdded false;Worker w null;try {w new Worker(firstTask);final Thread t w.thread;if (t ! null) {final ReentrantLock mainLock this.mainLock;mainLock.lock();try {// 持有锁的情况下获取 ctl, 防止 shutdown、shutdownNow 导致的状态变更int rs runStateOf(ctl.get());// 运行状态为 RUNNING或运行状态为 SHUTDOWN 且 firstTask为空 才允许启动工作线程if (rs SHUTDOWN ||(rs SHUTDOWN firstTask null)) {// 线程可能已经启动, 抛出异常(例如: 自定义的ThreadFactory#newThread 方法多次调用返回同一个 Thread 对象)if (t.isAlive())throw new IllegalThreadStateException();workers.add(w); // 添加到 HashSet 中int s workers.size();if (s largestPoolSize)largestPoolSize s;workerAdded true;}} finally {mainLock.unlock();}if (workerAdded) {// worker 成功添加到 workers 集合, 在这里真正启动工作线程t.start();workerStarted true;}}} finally {if (! workerStarted)// 启动线程失败(可能线程已经启动 或 线程池状态发生改变), 将worker从workers中移除, 扣减 workerCountaddWorkerFailed(w);}return workerStarted; }大部分代码阅读注释即可了解原理这里提一下我阅读时产生疑惑的地方 疑惑一firstTask 等于 null 代表什么为什么判断能否创建线程时处于 SHUTDOWN 状态还需要 firstTask 等于 null if (rs SHUTDOWN ! (rs SHUTDOWN firstTask null ! workQueue.isEmpty()))return false;疑惑二为什么需要在持有 mainLock 后需要重新检查运行状态 rs 先来看疑惑一firstTask 等于 null 出现的场景有 预启动核心线程所有包含 prestart 单词的方法 public boolean prestartCoreThread() {return workerCountOf(ctl.get()) corePoolSize addWorker(null, true); }public int prestartAllCoreThreads() {int n 0;while (addWorker(null, true))n;return n; }在工作线程退出时替换死亡的工作线程。processWorkerExit 方法 private void processWorkerExit(Worker w, boolean completedAbruptly) {// ...int c ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min allowCoreThreadTimeOut ? 0 : corePoolSize;if (min 0 ! workQueue.isEmpty())min 1;if (workerCountOf(c) min)return; // replacement not needed}addWorker(null, false);} }提交的新任务被缓冲在队列但活跃线程数 workCount 等于 0。(execute方法) 在 addWorker 方法中Running 状态可以创建工作线程SHUTDOWN 状态仅可以在 firstTask 等于 null 的条件下创建线程。这符合 SHUTDOWN 状态的设计初衷不接受新的任务、仅处理已添加至阻塞队列中的任务。 除了预启动场景execute 场景和 processWorkerExit场景 均是为了确保已经添加到任务队列中的任务不被放弃能够成功执行。 再来看疑惑二为什么在持有 mainLock 的情况下获取运行状态 rs 这是为了防止 shutdown、shutdownNow 方法关闭线程池改变运行状态。 为了确保 shutdown 和 shutdownNow 方法执行时 worker 集合的稳定从而保证方法执行过程的原子性这两种方法都会 在持有 mainLock 的情况下修改 runState。 因此如果创建 worker 时 rs 发生了改变从而不应该增加工作线程应该退出创建流程。例如 RUNNING 变为 STOP 状态此时不应该创建线程因为任务都被丢弃了。 mainLock.lock();int rs runStateOf(ctl.get());// 确保运行状态 rs 可以创建新的线程if (rs SHUTDOWN ||(rs SHUTDOWN firstTask null)) {// ...} mainLock.unlock();下面是我绘制的 addWorker 工作流程图作为本小节的总结 工作线程 Worker ThreadPoolExecutor 中的线程资源被包装为 Worker 对象它持有一个 Thread 对象实现了 Runnable 接口又继承了 AQS因此也具有锁的性质。 需要指出的是它没有利用 AQS 中的 CLH 队列管理等待资源的线程因为 Worker 并 不存在多个线程争抢所有权它的 lock 方法仅由内部持有的 线程调用。 private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{final Thread thread;/** Initial task to run. Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;Worker(Runnable firstTask) {setState(-1); // AQS state 属性初始化为 -1this.firstTask firstTask;this.thread getThreadFactory().newThread(this);}// 线程的执行逻辑就是 runWorker方法public void run() {runWorker(this);}// runWorker方法中, 线程在执行任务前持有锁, 将state更改为 1public void lock() { acquire(1); }// shutdown 关闭空闲线程时, 使用 tryLock 尝试获取锁 public boolean tryLock() { return tryAcquire(1); }// 任务执行完成释放锁, state更改为 0public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }// ... }AQS 在 Worker 中的主要作用是维护 state 属性。Worker 构造函数中state 初始化为 -1执行 runWorker() 方法时会被设置为 0。state 等于 0 说明线程是空闲的state 等于 1 说明线程正在处理任务。 Worker#lock() 方法在仅在 runWorker 方法中被调用线程在执行任务前调用该方法持有锁将state更改为 1。Worker#unlock() 方法在执行完任务后被调用释放锁将 state 更改为 0。Worker#tryLock() 方法在 shutdown() 方法中被调用用于中断空闲的工作线程因为空闲的 Worker state 等于 0因为 tryLock 能返回 true。 主逻辑 runWorker runWorker 代码及详细注释如下 final void runWorker(Worker w) {Thread wt Thread.currentThread();// 工作线程的第一个任务, 创建核心线程 或 线程池已满创建非核心线程时, firstTask非空Runnable task w.firstTask;w.firstTask null;// 将 state 由初始值 -1 修改为 0w.unlock(); // allow interruptsboolean completedAbruptly true;try {// getTask如果为空, 说明任务队列中已经没有任务可以执行, 工作线程正常退出while (task ! null || (task getTask()) ! null) {w.lock(); // 在执行任务前, 清除线程的中断标记(较为费解, 随后详细解释)if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() runStateAtLeast(ctl.get(), STOP))) !wt.isInterrupted())wt.interrupt();try {// 执行任务前的钩子方法, 继承ThreadPoolExecutor的类可重写beforeExecute(wt, task);Throwable thrown null;try {// 执行任务task.run(); } catch (RuntimeException x) {thrown x; throw x;} catch (Error x) {thrown x; throw x;} catch (Throwable x) {thrown x; throw new Error(x);} finally {// 执行任务完成后的钩子方法, 继承ThreadPoolExecutor的类可重写afterExecute(task, thrown);}} finally {task null;w.completedTasks;w.unlock(); // state修改为0, 工作线程空闲}}completedAbruptly false;} finally {// 处理线程退出:// 1. 从 worker 集合中移除当前工作线程// 2. 如果活跃线程数不满足线程池运行的最低要求, 或者线程因为执行异常而终止, 创建新线程替换processWorkerExit(w, completedAbruptly);}}工作线程的运行流程概括起来为 getTask 从线程池中获取 Runnable 任务按照 beforeExecute、Runnable#run、afterExecute 的顺序执行beforeExecute 和 afterExecute 为 ThreadPoolExecutor 提供的两个扩展点子类可以重写这两个方法满足打点、日志等自定义需求。如果任务顺利执行进行下一轮循环通过 getTask 获取新任务。 如果 getTask 返回 null说明任务队列中没有任务 或者 当前线程因为线程池关闭而被中断。如果任务 或 钩子函数执行时抛出了异常线程同样会退出completedAbruptly 为 true。 在讲解完工作线程的主要流程后我们来讨论下面这个 if 语句的含义 Thread wt Thread.currentThread(); w.lock(); if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() runStateAtLeast(ctl.get(), STOP))) !wt.isInterrupted())wt.interrupt(); // ...这段代码执行的目的是 工作线程 worker 已经领取了一个任务准备执行如果线程池状态为 RUNNING 或 SHUTDOWN应该确保当前线程的中断标记被清除从而不影响任务的执行。Thread.interrupted() 方法会 返回当前线程的中断标记并将线程中断标记清空。 如果线程池的状态为 STOP且当前线程未被中断wt.interrupt() 为当前线程打上中断标记。 下面我来分类讨论帮助大家更好的理解 runStateAtLeast(ctl.get(), STOP) true !wt.isInterrupted() true 当前线程池的状态至少为 STOP当前线程却没有中断标记。if 判断为 true中断当前线程(runStateAtLeast(ctl.get(), STOP) false、Thread.interrupted() true runStateAtLeast(ctl.get(), STOP) false。 if 判断为 false当前线程的状态为 RUNNING 或 SHUTDOWN且已经有一个即将执行的任务Thread.interrupted() 将中断标记清除。(runStateAtLeast(ctl.get(), STOP) false、Thread.interrupted() true runStateAtLeast(ctl.get(), STOP) true、!wt.isInterrupted() true。 这种情况非常反直觉但是有可能出现的。下图操作序列很好说明了这种情况因为 错误地将 STOP 中断标记给清除所以 if 也会判断为 true执行 wt.interrupt() 中断当前线程。 (runStateAtLeast(ctl.get(), STOP) false、Thread.interrupted() false runStateAtLeast(ctl.get(), STOP) true 这种情况类似上一种只是线程池状态设置为 STOP还未中断当前线程if 操作会返回 false。 获取任务 getTask() 工作线程通过 getTask 从任务队列中获取任务如果 getTask 返回 null线程就会退出 runWorker 中的死循环。 getTask 何时返回 null 条件一线程池状态为STOP、TIDYING、TERMINATED或者是 SHUTDOWN且工作队列为空。 条件二工作线程 wc 大于最大线程数或当前工作线程已经超时 且还有其他工作线程或任务队列为空。 当前线程超时的条件【核心线程可以超时】或【线程数大于核心线程数】且 上一轮循环从阻塞队列的 poll 方法超时返回。 private Runnable getTask() {boolean timedOut false; // Did the last poll() time out?for (;;) {int c ctl.get();int rs runStateOf(c); // rs保留c的高3位, 低29位全部清零// 大小顺序为TERMINATED TIDYING STOP SHUTDOWN RUNNING// 条件一if (rs SHUTDOWN (rs STOP || workQueue.isEmpty())) {decrementWorkerCount(); // cas扣减线程数return null;}int wc workerCountOf(c);// timed表示当前线程是否能够超时(设置了【核心线程超时】或线程数超过了核心线程)boolean timed allowCoreThreadTimeOut || wc corePoolSize;// 条件二if ((wc maximumPoolSize || (timed timedOut)) (wc 1 || workQueue.isEmpty())) {// 可能有多个线程同时满足条件二, 需要使用cas扣减if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 超出核心线程数时, poll等待存在超时时间; 反之, 使用take阻塞Runnable r timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS):workQueue.take();if (r ! null)return r;timedOut true; // poll取任务超时, timedOut设置为true} catch (InterruptedException retry) {timedOut false;}} }getTask 的流程图如下 随后我将在【工作线程的退出】章节详细介绍 不同场景线程池回收工作线程的过程 会结合 getTask 方法分析。 工作线程的退出 RUNNING 状态所有任务执行完成 这种场景下会将工作线程的数量减少到核心线程数大小。 int wc workerCountOf(c); boolean timed allowCoreThreadTimeOut || wc corePoolSize;if ((wc maximumPoolSize || (timed timedOut)) (wc 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue; }timed 表示是否允许线程因为超时被回收timedOut 记录上一轮循环中线程从阻塞队列获取任务是否超时了。 假设线程池核心线程数为2最大线程数为4。线程数低于核心线程数时使用execute 提交任务便会创建核心线程线程数达到 2 后任务被添加至阻塞队列如果阻塞队列也满了将工作线程逐渐增加到 4。当全部任务执行完成后 工作队列为空四个线程阻塞在 workQueue.poll 上各自等待 keepAliveTime 时间后超时返回timedOut 设置为 true。 进入下一轮循环因为 wc 等于 4 大于 corePoolSize2因此四个线程 timed 均为 true从而 timedtimedOut 为 true 且 当前任务队列为空情况二成立4 个线程都可以被超时回收。 四个线程尝试 CAS 扣减 wc 为 3仅有一个线程能扣减成功getTask 返回 null。其余三个线程继续循环直到线程数达到核心线程数timed 等于 false。 shutdown 关闭线程池 调用 shutdown() 后线程池状态流转为 SHUTDOWN随后向所有的空闲工作线程发送中断信号。 public void shutdown() {final ReentrantLock mainLock this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers(); // 中断所有空闲线程onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate(); }处于 getTask 获取任务阶段的工作线程是空闲的并没有锁定 Worker。我将分三种情况探讨工作线程如何响应中断信号。 任务全部完成所有线程在等待任务队列中积压了大量任务所有线程在繁忙队列中剩余的任务少于空闲线程数 所有线程等待新任务 // getTask(): 条件一 if (rs SHUTDOWN (rs STOP || workQueue.isEmpty())) {decrementWorkerCount(); // cas扣减线程数return null; } ... try {// 超出核心线程数时, poll等待存在超时时间; 反之, 使用take阻塞Runnable r timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r ! null)return r;timedOut true; // poll取任务超时, timedOut设置为true } catch (InterruptedException retry) {timedOut false; }中断信号将阻塞的线程唤醒进入下一轮循环。当到达条件一处检查到 rs 等于SHUTDOWN且工作队列为空满足条件扣减线程数后返回null。在runWorker 中退出循环结束线程。 所有线程繁忙 此时任务队列中积压了很多任务工作线程因为 shutdown 而被中断在获取任务时 调用 poll 或 take 方法都会抛出 InterruptedException 异常然后被 catch 捕获重新进行循环。 第二次循环到达条件一虽然 rs 为 SHUTDOWN但是工作队列非空不满足退出条件。 // 工作队列非空, 条件1不满足 if (rs SHUTDOWN (rs STOP || workQueue.isEmpty())) {decrementWorkerCount();return null; }timedOut 为 false不是因为 poll 超时而返回因此条件 2 也不满足 // timedOut false if ((wc maximumPoolSize || (timed timedOut)) (wc 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue; }因此shutdown 方法在线程池繁忙的情况下相当于让 正在获取任务的线程空转了一次不影响线程池运行。 队列中剩余少量的任务 假设情形 线程池状态已经是SHUTDOWN但任务队列中剩余两个任务A、B、C、D四个线程同时通过条件一和条件二尝试从阻塞队列中获取任务。线程A、B成功获取任务而线程 C、D因队列为空而阻塞。 线程A、B执行完任务后再次调用 getTask()条件一的判断为true(线程池运行状态为SHUTDOWN且工作队列为空)于是返回 null线程退出 runWorker 死循环准备进行回收。 final void runWorker(Worker w) {boolean completedAbruptly true;try {while (task ! null || (task getTask()) ! null) {...}}finally {// 回收退出的线程processWorkerExit(w, completedAbruptly); }在回收前还需要执行 processWorkerExit 方法。在该方法中会将 worker 移除出 worker 集合并调用tryTerminate()。 private void processWorkerExit(Worker w, boolean completedAbruptly) {// 执行任务时抛出异常退出, 而非getTask()返回null退出, 需要更新ctl属性反映线程数的变化if (completedAbruptly) decrementWorkerCount();final ReentrantLock mainLock this.mainLock;mainLock.lock();try {completedTaskCount w.completedTasks; // 统计完成的任务数workers.remove(w); // 将Worker对象移除工作线程集合} finally {mainLock.unlock();}tryTerminate();... }在 tryTerminate 中线程A、B判断线程池状态为 SHUTDOWN 且工作队列为空不会在第一个 if 处返回。 然后判断出当前workers中的工作线程数不为0(因为线程C、D正阻塞)然后调用 interruptIdleWorkers(ONLY_ONE)。 注意此时线程A、线程B的线程数已经从ctl扣减Worker实例也从workers中移除。 final void tryTerminate() {for (;;) {int c ctl.get();if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) SHUTDOWN ! workQueue.isEmpty()))return;// 线程池状态为SHUTDOWN, 但仍然有线程阻塞在take或poll方法处if (workerCountOf(c) ! 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}...} }interruptIdleWorkers 的入参 onlyOne 为true因此只会中断一个空闲线程然后break循环。假设先中断线程C线程C从阻塞中被唤醒抛出InterruptedException异常被 catch 住异常后重新进行一轮循环发现条件一满足更新 ctl 并返回null。 private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t w.thread;// 正在执行任务的Worker是无法获取锁的, 因此这里只能回收空闲线程if (!t.isInterrupted() w.tryLock()) {try {t.interrupt(); } catch (SecurityException ignore) {} finally {w.unlock();}}// 仅中断一个空闲线程if (onlyOne)break;}} finally {mainLock.unlock();} }随后线程 D 可以由上一个退出的线程中断唤醒例如线程 C从而让工作线程优雅地退出。 写在最后 感谢各位读者阅读本片博客本篇博客的创作过程中参考了大量资料笔者也详细阅读了 ThreadPoolExecutor 源码。笔者将很多阅读源码的思考融入本篇博客尽可能去体会 Doug Lea 大神每一行代码的用意。这些细节可能很少有博客涉及因此很可能存在纰漏和理解错误。如果有异见欢迎在评论区指教笔者将虚心倾听。 创作过程耗时费力但我乐在其中钻研源码的过程和分享知识是让人快乐的事情如果大家喜欢这种图文结合、代码详细注释的写作风格就给我点一个免费的赞吧
http://www.zqtcl.cn/news/117728/

相关文章:

  • 微信制作网站设计重庆关键词优化软件
  • 网站的设计与应用论文平台推广计划书模板范文
  • 网站备案用户名忘了怎么办网站做301排名会掉
  • 厦门制作网站企业网站子域名怎么做
  • 青岛微网站开发品牌建设青之见
  • 淄博哪有培训做网站的湖南营销型网站建设企业
  • 动物网站建设深圳最好的营销网站建设公司
  • 各种网站制作陕西建设厅证件查询网站
  • 如何提高一个网站如何做简单网站
  • 游戏网站开发找什么人可建智慧园区设计方案
  • 重庆网站设计公司推荐福州移动网站建设
  • 移动网站功能做网站fjfzwl
  • 食品网站建设的目的中级经济师考试成绩查询
  • 普宁建设局网站免费的网站开发平台
  • 网站域名主机空间区别网站上传系统
  • 建设高端网站公司的目的淮南房产网
  • 网站建设 中山网站建设新得体会
  • 快速搭建网站视频教程看想看的做想做的电影网站好
  • 网站聊天怎么做2345网址导航智能主版
  • 如何优化网站加载速度做推广公司
  • 网站下载不了视频php网站 数据库链接
  • 制作网页网站教程wordpress建立扁平化
  • 网站建设小知识郑州网站建设找伟置
  • 苏中建设官方网站旅游做攻略用什么网站好
  • 信息门户网站制作wordpress改商城
  • 企业类网站有哪些甘肃省和住房建设厅网站
  • 嘉兴市住房和城乡建设局网站wordpress nodejs版本
  • 做网站 百度推广深圳外贸招聘
  • 网站留言板功能网站建设 核对流程
  • WordPress输出当前网址郑州官网seo厂家