深圳建设工程交易中心网站,企业邮箱163登录入口,北京建设部网站职称,开建筑公司取名字如何取好旺RunnableFuture
源码学习#xff1a;
成员变量 任务的运行状态的转化 package java.util.concurrent;
import java.util.concurrent.locks.LockSupport;/**可取消的异步计算。该类提供了Future的基本实现#xff0c;包括启动和取消计算的方法#xff0c;查询计算是否完成以…RunnableFuture
源码学习
成员变量 任务的运行状态的转化 package java.util.concurrent;
import java.util.concurrent.locks.LockSupport;/**可取消的异步计算。该类提供了Future的基本实现包括启动和取消计算的方法查询计算是否完成以及获取计算结果的方法。只有在计算完成后才能获取结果如果计算尚未完成get方法将会阻塞。一旦计算完成就无法重新启动或取消计算除非使用runAndReset方法调用计算。FutureTask可以用来包装一个Callable或Runnable对象。由于FutureTask实现了Runnable接口因此可以将FutureTask提交给Executor执行。 除了作为独立的类使用外该类还提供了一些受保护的功能这些功能在创建自定义任务类时可能会有用。*/
public class FutureTaskV implements RunnableFutureV {/**此任务的运行状态初始为NEW。运行状态仅在set、setException和cancel方法中转换为终态。在完成过程中状态可能会暂时变为COMPLETING在设置结果时或INTERRUPTING仅在中断执行者以满足cancel(true)时。从这些中间状态到最终状态的转换使用更便宜的有序/懒惰写入因为这些值是唯一的且不会进一步修改。* Possible state transitions:* NEW - COMPLETING - NORMAL 正常结束* NEW - COMPLETING - EXCEPTIONAL 异常结束* NEW - CANCELLED* NEW - INTERRUPTING - INTERRUPTED*/// 状态值 表示任务运行的状态private volatile int state;// 新建 或者 正在运行private static final int NEW 0;// 中间状态任务执行完了但是结果集正结果/ 异常 没有设置到 outcomeprivate static final int COMPLETING 1;// 正常执行完成结果集设置到outcome之后正常结束)private static final int NORMAL 2;// 异常执行完成结果集设置到outcome之后异常结束private static final int EXCEPTIONAL 3;// 取消private static final int CANCELLED 4;// 中断中间值[但是还没有中断]private static final int INTERRUPTING 5;// 中断完成最终状态private static final int INTERRUPTED 6;/** The underlying callable; nulled out after running */// 执行目标private CallableV callable;/** The result to return or exception to throw from get() */// 结果集private Object outcome; // non-volatile, protected by state reads/writes/** The thread running the callable; CASed during run() */// 执行任务的线程private volatile Thread runner;/** Treiber stack of waiting threads */// get 阻塞的时候使用 WaitNode{物理结构链表逻辑结构栈}去存储阻塞的线程private volatile WaitNode waiters;public FutureTask(CallableV callable) {if (callable null) throw new NullPointerException();this.callable callable;this.state NEW; // ensure visibility of callable}public FutureTask(Runnable runnable, V result) {this.callable Executors.callable(runnable, result); // 适配器的方式this.state NEW; // ensure visibility of callable}public boolean isCancelled() {return state CANCELLED;}public boolean isDone() {return state ! NEW;}/*** 简单的链表节点用于记录Treiber堆栈中的等待线程。*/static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread Thread.currentThread(); }}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long stateOffset;private static final long runnerOffset;private static final long waitersOffset;static {try {UNSAFE sun.misc.Unsafe.getUnsafe();Class? k FutureTask.class;stateOffset UNSAFE.objectFieldOffset(k.getDeclaredField(state));runnerOffset UNSAFE.objectFieldOffset(k.getDeclaredField(runner));waitersOffset UNSAFE.objectFieldOffset(k.getDeclaredField(waiters));} catch (Exception e) {throw new Error(e);}}}run() 任务只能执行一次 public void run() {// 状态 是New 并且 cas 成功的把当前线程设置到 runner 才能执行后续的方法否则就直接返回if (state ! NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {CallableV c callable;// 当前 要执行的任务存在并且 状态 是New 才会调用目标逻辑 c.call()if (c ! null state NEW) {V result;boolean ran;try {result c.call();// 目标逻辑执行成功ran true;} catch (Throwable ex) {// 目标逻辑执行 失败 结果 置为 nullresult null;ran false;// 设置异常结果集setException(ex);}if (ran)// 设置正常结果集set(result);}} finally {// 在任务状态被确定之前runner必须非空以防止对run()方法的并发调用。runner null;// 在将runner设置为null之后必须重新读取任务的状态以防止泄漏的中断。int s state;if (s INTERRUPTING) // 处于中断状态执行中断后续逻辑handlePossibleCancellationInterrupt(s);}
}FunctionalInterface
public interface CallableV {V call() throws Exception;
}/*将此Future的结果设置为给定的值除非此Future已经被设置或已取消。在计算成功完成时此方法由run方法在内部调用。
*/protected void set(V v) {// cas 的方式把状态变为 COMPLETING 设置成功if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 设置结果集为 结果 result outcome v;// cas 的方式把状态变为最终状态: NORMALUNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state// 执行后续操作finishCompletion();}}/*将此Future报告为ExecutionException将给定的throwable作为其原因除非此Future已经被设置或已取消。在计算失败时此方法由run方法在内部调用。
*/protected void setException(Throwable t) {// cas 的方式把状态变为 COMPLETING 设置成功if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 设置结果集为 异常outcome t;// cas 的方式把状态变为最终状态: EXCEPTIONALUNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state// 执行后续操作finishCompletion();}}/*
确保任何来自可能的cancel(true)取消操作的中断仅在run或runAndReset方法中传递给任务的目的。
*/private void handlePossibleCancellationInterrupt(int s) {// 解释了在等待中断信号时使用自旋等待的目的: 通过中断来取消任务的执行。然而可能存在一种情况即中断线程在 有机会中断当前线程 之前被阻塞。为了等待中断信号的到来代码使用了自旋等待的策略。if (s INTERRUPTING)while (state INTERRUPTING) // 处于中间状态的时候就 让出 cpuThread.yield(); // wait out pending interrupt// assert state INTERRUPTED;// 解释了在state等于INTERRUPTED时的处理逻辑// 它使用断言assert来确保任务的状态为INTERRUPTED。断言通常用于在代码中插入一些检查以确保某些条件为真。如果断言的条件为假将会抛出一个AssertionError异常。// Thread.interrupted();// 解释了清除可能来自cancel(true)取消操作的中断的目的。但是中断也可以作为一个独立的机制用于任务与其调用者之间的通信并且没有办法只清除取消中断。因此为了清除中断代码调用了Thread.interrupted()方法。// Thread.interrupted()方法用于清除当前线程的中断状态并返回之前的中断状态。这样做的目的是确保任务的中断状态被清除以便后续的代码或操作不会受到中断的影响。}
finishCompletion() /*** 移除并唤醒所有等待的线程调用done()方法并将callable置为null。*/private void finishCompletion() {// assert state COMPLETING;// 循环获取等待队列中的等待节点 waiters 等待节点里面保存了等待任务完成的线程for (WaitNode q; (q waiters) ! null;) {// 要是cas 的方式成功的将等待队列 waitersOffset 设置为 null if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {// 循环处理每一个等待的节点for (;;) {// 取出当前等待节点 持有的线程Thread t q.thread;// 线程存在if (t ! null) {// 将等待节点的线程引用设为null并调用LockSupport.unpark(t)方法来唤醒该线程q.thread null;LockSupport.unpark(t); //LockSupport.unpark(t)方法用于唤醒一个被阻塞的线程。}WaitNode next q.next; // 取节点下一个元素if (next null)// 要是没有后继节点此时表示已经处理完所有等待节点,退出 死循环break;// 将后继节点置为 nullq.next null; // unlink to help gc // 节点后移q next;}break; // (q waiters) null 退出循环}}done(); // 调用done()方法来完成任务的执行 —————— 钩子方法/*这段代码是一个保护protected方法当任务转换为已完成状态isDone时被调用无论是正常完成还是通过取消完成。默认实现不执行任何操作。子类可以重写这个方法来调用完成回调或进行记录。注意在该方法的实现中您可以查询状态来确定任务是否已被取消。protected void done() { }*/callable null; // to reduce footprint 将callable引用设为null以减少内存占用。}
runAndReset() 任务可以执行多次 和 run() 的区别就是 没有正常的结果设置结果集 /**执行计算但不设置其结果然后将该Future重置为初始状态。如果计算遇到异常或被取消则无法执行重置操作。这个方法设计用于那些本质上需要执行多次的任务。
*/// 执行并重置任务
protected boolean runAndReset() {if (state ! NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return false;boolean ran false;int s state;try {CallableV c callable;if (c ! null s NEW) {try {c.call(); // dont set resultran true;} catch (Throwable ex) {setException(ex);}/*和 run() 的区别就是 没有正常的结果设置结果集*/}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner null;// state must be re-read after nulling runner to prevent// leaked interruptss state;if (s INTERRUPTING)handlePossibleCancellationInterrupt(s);}return ran s NEW;
}get() /*** 如果计算被取消 会抛出异常*/public V get() throws InterruptedException, ExecutionException {int s state;// 此时的状态只有 New 新建 或者 正在执行 或者 COMPLETING任务执行结束结果集该没有设置成功if (s COMPLETING)// 阻塞等待s awaitDone(false, 0L);// 否则的话 返回结果集正常 或者 异常return report(s);}/**等待方法根据传入的参数决定是否使用定时等待。如果使用定时等待则会在指定的时间内 等待完成 或者 在中断或超时时中止。*/private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline timed ? System.nanoTime() nanos : 0L;WaitNode q null;boolean queued false;//[注] 带有阻塞的都要有 死循环防止虚假唤醒for (;;) { // 死循环if (Thread.interrupted()) { // 判断是不是中断removeWaiter(q); // 移除节点 throw new InterruptedException(); // 抛错} int s state;if (s COMPLETING) {// 此时结果设置成功// 当前节点存在将其持有的线程 置空if (q ! null)q.thread null;// 返回结果结束阻塞return s;}// 还是在设置结果的状态让出 cpu else if (s COMPLETING) // cannot time out yetThread.yield();// 处于 new 状态else if (q null)// 创建节点q new WaitNode();// 插入链表else if (!queued)// cas 的方式 头插法queued UNSAFE.compareAndSwapObject(this, waitersOffset,q.next waiters, q);// 超时等待else if (timed) {nanos deadline - System.nanoTime();// 时间已过移除结果返回状态if (nanos 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}}/***完成任务之后 返回结果或者抛出异常*/SuppressWarnings(unchecked)private V report(int s) throws ExecutionException {Object x outcome;if (s NORMAL)return (V)x;if (s CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}
removeWaiter(WaitNode node) /*** 尝试取消链接超时或中断的等待节点以避免累积垃圾。内部节点只是简单地取消拼接而无需使用CAS因为如果它们被释放器遍历这样做是无害的。为了避免从已经删除的节点中取消拼接的影响在明显存在竞争的情况下列表将被重新遍历。当节点很多时这会很慢但我们不希望列表足够长以抵消更高开销的方案。*/private void removeWaiter(WaitNode node) {if (node ! null) {// 当前节点存在// 将传入节点的线程引用置为null表示该节点不再持有线程node.thread null;retry:// 死循环for (;;) { // restart on removeWaiter race// 遍历链表 中的所有的 等待节点for (WaitNode pred null, q waiters, s; q ! null; q s) {// 取出下一个节点s q.next;// 要是此等待节点持有的线程不是 nullif (q.thread ! null)pred q; // 将前置节点设置为 当前节点q else if (pred ! null) {// 等待节点持有的线程是 null ,但是前置节点 不是 null// 前驱节点的next指向当前节点pred.next s; // 前置节点持有的线程不存在了表示存在竞争情况需要重新开始循环。执行下次死循环if (pred.thread null) // check for racecontinue retry;}
//等待节点持有的线程是 null ,但是前置节点 是 null, cas的方式成功的将节点下移当前节点从等待队列中移除执行下次死循环// cas的方式将waitersOffset处的值从q替换为selse if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s))continue retry;}// 内层遍历结束等待集合中没有无效节点break;}}}get(long timeout, TimeUnit unit) public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit null)throw new NullPointerException();int s state; // 获取当前对象的状态// 调用awaitDone方法来等待操作完成,如果返回的状态值小于等于COMPLETING,则表示操作未完成继续等待,如果等待的时间超过了超时时间则抛出TimeoutException异常。if (s COMPLETING (s awaitDone(true, unit.toNanos(timeout))) COMPLETING)throw new TimeoutException();//将最终的状态值作为参数传递给report方法并返回report方法的返回值。return report(s);
}cancel() public boolean cancel(boolean mayInterruptIfRunning) {if (!(state NEW UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;/*mayInterruptIfRunning:true: 可以中断正在执行的任务 INTERRUPTINGfasle: 不可以中断正在执行的任务 CANCELLED*/
// 状态是 new ;cas 的方式把 任务的状态从NEW修改为INTERRUPTING或CANCELLED。如果修改成功表示取消成功返回true。try { // in case call to interrupt throws exceptionif (mayInterruptIfRunning) {try {Thread t runner; // 尝试中断正在执行任务的线程if (t ! null) // 如果任务的runner不为null则调用interrupt()方法中断线程。t.interrupt();} finally { // final state// 设置任务最终的状态 cas 的方式将任务的状态修改为INTERRUPTED。UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {// 调用finishCompletion()方法完成任务的处理finishCompletion();}return true; //返回true表示取消成功。}手撕FutureTask
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.LockSupport;public class FutureTask_T implements Runnable {private Future_T future;public FutureTask_(Future_T future) {this.future future;}public FutureTask_(Runnable runnable) {this.future new FutureAdaptive(runnable);}Overridepublic void run() {try{res this.future.code();state 1;}catch (Exception e){res e;state 2;}for (Thread thread : threadList){LockSupport.unpark(thread); // 唤醒}}private Object res;private volatile int state;private ListThread threadList new ArrayList();public T get(){for (;;){if(state 0){threadList.add(Thread.currentThread());LockSupport.park(); // 阻塞}else if(state 1){return (T)res;}else if(state 2){throw new RuntimeException(res.toString());}}}private class FutureAdaptive implements Future_T {public Runnable runnable;public FutureAdaptive(Runnable runnable) {this.runnable runnable;}Overridepublic T code() throws Exception {this.runnable.run();return null;}}
}class MM {public static void main(String[] args){Future_String future new Future_String() {Overridepublic String code() throws Exception {return future;}};Runnable runnable new Runnable(){Overridepublic void run() {System.out.println(runnable);}};FutureTask_String future_ new FutureTask_String(future);FutureTask_String runnable_ new FutureTask_String(runnable);new Thread(future_).start();new Thread(runnable_).start();System.out.println(future_.get());LockSupport.parkNanos(2*1000*1000*1000);}
}interface Future_T{T code() throws Exception;
}