提升网站流量该怎么做,云虚拟主机搭建wordpress,网站建设的文件,烟花代码htmlJava的Future机制详解 一、为什么出现Future机制二、Future的相关类图2.1 Future 接口2.2 FutureTask 类 三、FutureTask的使用方法四、FutureTask源码分析4.1 state字段4.2 其他变量4.4 构造函数4.5 run方法及其他 一、为什么出现Future机制
常见的两种创建线程的方式。一种是… Java的Future机制详解 一、为什么出现Future机制二、Future的相关类图2.1 Future 接口2.2 FutureTask 类 三、FutureTask的使用方法四、FutureTask源码分析4.1 state字段4.2 其他变量4.4 构造函数4.5 run方法及其他 一、为什么出现Future机制
常见的两种创建线程的方式。一种是直接继承Thread另外一种就是实现Runnable接口。
这两种方式都有一个缺陷就是在执行完任务之后无法获取执行结果。
从Java 1.5开始就提供了Callable和Future通过它们可以在任务执行完毕之后得到任务执行结果。
Future模式的核心思想是能够让主线程将原来需要同步等待的这段时间用来做其他的事情。因为可以异步获得执行结果所以不用一直同步等待去获得执行结果 上图简单描述了不使用Future和使用Future的区别不使用Future模式主线程在invoke完一些耗时逻辑之后需要等待这个耗时逻辑在实际应用中可能是一次RPC调用可能是一个本地IO操作等。B图表达的是使用Future模式之后我们主线程在invoke之后可以立即返回去做其他的事情回头再来看看刚才提交的invoke有没有结果。
二、Future的相关类图
2.1 Future 接口
首先我们需要清楚Futrue是个接口。Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果该方法会阻塞直到任务返回结果。 接口定义行为我们通过上图可以看到实现Future接口的子类会具有哪些行为
我们可以取消这个执行逻辑如果这个逻辑已经正在执行提供可选的参数来控制是否取消已经正在执行的逻辑。我们可以判断执行逻辑是否已经被取消。我们可以判断执行逻辑是否已经执行完成。我们可以获取执行逻辑的执行结果。我们可以允许在一定时间内去等待获取执行结果如果超过这个时间抛TimeoutException。
2.2 FutureTask 类
类图如下 FutureTask是Future的具体实现。FutureTask实现了RunnableFuture接口。RunnableFuture接口又同时继承了Runnable 和 Future 接口。所以FutureTask既可以作为Runnable被线程执行又可以作为Future得到Callable的返回值。
三、FutureTask的使用方法
举个例子假设我们要执行一个算法算法需要两个输入 input1 和 input2, 但是input1和input2需要经过一个非常耗时的运算才能得出。由于算法必须要两个输入都存在才能给出输出所以我们必须等待两个输入的产生。接下来就模仿一下这个过程。
package src;import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;public class FutureTaskTest {public static void main(String[] args) throws InterruptedException, ExecutionException {long starttime System.currentTimeMillis();//input2生成 需要耗费3秒FutureTaskInteger input2_futuretask new FutureTask(new CallableInteger() {Overridepublic Integer call() throws Exception {Thread.sleep(3000);return 5;}});new Thread(input2_futuretask).start();//input1生成需要耗费2秒FutureTaskInteger input1_futuretask new FutureTask(new CallableInteger() {Overridepublic Integer call() throws Exception {Thread.sleep(2000);return 3;}});new Thread(input1_futuretask).start();Integer integer2 input2_futuretask.get();Integer integer1 input1_futuretask.get();System.out.println(algorithm(integer1, integer2));long endtime System.currentTimeMillis();System.out.println(用时 String.valueOf(endtime - starttime));}//这是我们要执行的算法public static int algorithm(int input, int input2) {return input input2;}
}输出结果 我们可以看到用时3001毫秒与最费时的input2生成时间差不多。 注意我们在程序中生成input1时也让线程休眠了2秒但是结果不是32。说明FutureTask是被异步执行了。
四、FutureTask源码分析
4.1 state字段
volatile修饰的state字段表示FutureTask当前所处的状态。可能的转换过程见注释。 /*** Possible state transitions:* NEW - COMPLETING - NORMAL创建到正常运行结束的状态变化轨迹* NEW - COMPLETING - EXCEPTIONAL创建到异常运行结束的状态变化轨迹* NEW - CANCELLED 创建到取消的状态变化轨迹* NEW - INTERRUPTING - INTERRUPTED创建到中断结束的状态变化轨迹*/private volatile int state;// NEW 新建状态表示这个 FutureTask还没有开始运行private static final int NEW 0;// COMPLETING 完成状态 表示 FutureTask 任务已经计算完毕了// 但是还有一些后续操作例如唤醒等待线程操作还没有完成。private static final int COMPLETING 1;// FutureTask 任务完结正常完成没有发生异常private static final int NORMAL 2;// FutureTask 任务完结因为发生异常。private static final int EXCEPTIONAL 3;// FutureTask 任务完结因为取消任务private static final int CANCELLED 4;// FutureTask 任务完结也是取消任务不过发起了中断运行任务线程的中断请求private static final int INTERRUPTING 5;// FutureTask 任务完结也是取消任务已经完成了中断运行任务线程的中断请求private static final int INTERRUPTED 6;4.2 其他变量 /** 任务 */private CallableV callable;/** 储存结果*/private Object outcome; // non-volatile, protected by state reads/writes/** 执行任务的线程*/private volatile Thread runner;/** get方法阻塞的线程队列 */private volatile WaitNode waiters;//FutureTask的内部类get方法的等待队列static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread Thread.currentThread(); }}4.3 CAS工具初始化 // Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long stateOffset;private static final long runnerOffset;private static final long waitersOffset;static {try {UNSAFE sun.misc.Unsafe.getUnsafe();Class? k FutureTask.class;stateOffset UNSAFE.objectFieldOffset(k.getDeclaredField(state));runnerOffset UNSAFE.objectFieldOffset(k.getDeclaredField(runner));waitersOffset UNSAFE.objectFieldOffset(k.getDeclaredField(waiters));} catch (Exception e) {throw new Error(e);}}这段代码是为了后面使用CAS而准备的。可以这么理解
一个java对象可以看成是一段内存各个字段都得按照一定的顺序放在这段内存里同时考虑到对齐要求可能这些字段不是连续放置的用这个UNSAFE.objectFieldOffset方法能准确地告诉你某个字段相对于对象的起始内存地址的字节偏移量因为是相对偏移量所以它其实跟某个具体对象又没什么太大关系跟class的定义和虚拟机的内存模型的实现细节更相关。
4.4 构造函数
FutureTask有两个构造函数
public FutureTask(CallableV callable) {if (callable null)throw new NullPointerException();this.callable callable;this.state NEW; // ensure visibility of callable
}public FutureTask(Runnable runnable, V result) {this.callable Executors.callable(runnable, result);this.state NEW; // ensure visibility of callable
}这两个构造函数区别在于如果使用第一个构造函数最后获取线程实行结果就是callable的执行的返回结果而如果使用第二个构造函数那么最后获取线程实行结果就是参数中的result接下来让我们看一下FutureTask的run方法。
同时两个构造函数都把当前状态设置为NEW。
4.5 run方法及其他
构造完FutureTask后会把它当做线程的参数传进去然后线程就会运行它的run方法。所以我们先来看一下run方法
public void run() {//如果状态不是new或者runner旧值不为null(已经启动过了)就结束if (state ! NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {CallableV c callable; // 这里的callable是从构造方法里面传人的if (c ! null state NEW) {V result;boolean ran;try {result c.call(); //执行任务并将结果保存在result字段里。ran true;} catch (Throwable ex) {result null;ran false;setException(ex); // 保存call方法抛出的异常}if (ran)set(result); // 保存call方法的执行结果}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner null;// state must be re-read after nulling runner to prevent// leaked interruptsint s state;if (s INTERRUPTING)handlePossibleCancellationInterrupt(s);}}其中catch语句中的setException(ex)如下
//发生异常时设置state和outcome
protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); finishCompletion();// 唤醒get()方法阻塞的线程}}而正常完成时set(result);方法如下
//正常完成时设置state和outcome
protected void set(V v) {
//正常完成时NEW-COMPLETING-NORMALif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); finishCompletion(); // 唤醒get方法阻塞的线程}}这两个set方法中都是用到了finishCompletion()去唤醒get方法阻塞的线程。下面来看看这个方法
//移除并唤醒所有等待的线程调用done并清空callable
private void finishCompletion() {// assert state COMPLETING;for (WaitNode q; (q waiters) ! null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {Thread t q.thread;if (t ! null) {q.thread null;LockSupport.unpark(t); //唤醒线程}//接下来的这几句代码是将当前节点剥离出队列然后将q指向下一个等待节点。被剥离的节点由于thread和next都为null所以会被GC回收。WaitNode next q.next;if (next null)break;q.next null; // unlink to help gcq next;}break;}}done(); //这个是空的方法子类可以覆盖实现回调的功能。callable null; // to reduce footprint}好到这里我们把运行以及设置结果的流程分析完了。那接下来看一下怎么获得执行结果把。也就是get方法。
get方法有两个一个是有超时时间设置另一个没有超时时间设置。 public V get() throws InterruptedException, ExecutionException {int s state;if (s COMPLETING)s awaitDone(false, 0L);return report(s);}public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {// get(timeout, unit) 也很简单, 主要还是在 awaitDone里面if(unit null){throw new NullPointerException();}int s state;// 判断state状态是否 Completing, 调用awaitDone进行自旋等待if(s COMPLETING (s awaitDone(true, unit.toNanos(timeout))) COMPLETING){throw new TimeoutException();}// 根据state的值进行返回结果或抛出异常return report(s);}两个get方法都用到了awaitDone()。这个方法的作用是 等待任务执行完成、被中断或超时。看一下源码 //等待完成可能是是中断、异常、正常完成timed:true考虑等待时长false:不考虑等待时长private int awaitDone(boolean timed, long nanos) throws InterruptedException {final long deadline timed ? System.nanoTime() nanos : 0L; //如果设置了超时时间WaitNode q null;boolean queued false;for (;;) {/*** 有优先级顺序* 1、如果线程已中断则直接将当前节点q从waiters中移出* 2、如果state已经是最终状态了则直接返回state* 3、如果state是中间状态(COMPLETING),意味很快将变更过成最终状态让出cpu时间片即可* 4、如果发现尚未有节点则创建节点* 5、如果当前节点尚未入队则将当前节点放到waiters中的首节点并替换旧的waiters* 6、线程被阻塞指定时间后再唤醒* 7、线程一直被阻塞直到被其他线程唤醒**/if (Thread.interrupted()) {// 1removeWaiter(q);throw new InterruptedException();}int s state;if (s COMPLETING) {// 2if (q ! null)q.thread null;return s; }else if (s COMPLETING) // 3Thread.yield();else if (q null) // 4q new WaitNode();else if (!queued) // 5queued UNSAFE.compareAndSwapObject(this, waitersOffset, q.next waiters, q);else if (timed) {// 6nanos deadline - System.nanoTime();if (nanos 0L) {removeWaiter(q); //从waiters中移出节点qreturn state; }LockSupport.parkNanos(this, nanos); }else // 7LockSupport.park(this);}}接下来看下removeWaiter()移除等待节点的源码 private void removeWaiter(WaitNode node) {if (node ! null) {node.thread null; // 将移除的节点的threadnull, 为移除做标示retry:for (;;) { // restart on removeWaiter racefor (WaitNode pred null, q waiters, s; q ! null; q s) {s q.next;//通过 thread 判断当前 q 是否是需要移除的 q节点因为我们刚才标示过了if (q.thread ! null) pred q; //当不是我们要移除的节点就往下走else if (pred ! null) {//当p.threadnull时到这里。下面这句话相当于把q从队列移除。pred.next s;//pred.thread null 这种情况是在多线程进行并发 removeWaiter 时产生的//此时正好移除节点 node 和 pred, 所以loop跳到retry, 从新进行这个过程。想象一下如果在并发的情况下其他线程把pred的线程置为空了。那说明这个链表不应该包含pred了。所以我们需要跳到retry从新开始。if (pred.thread null) // check for racecontinue retry;}//到这步说明p.threadnull 并且 prednull。说明node是头结点。else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s))continue retry;}break;}}}
最后在get方法中调用report(s)根据状态s的不同进行返回结果或抛出异常。 private V report(int s) throws ExecutionException {Object x outcome; //之前我们set的时候已经设置过这个值了。所以直接用。if (s NORMAL)return (V)x; //正常执行结束返回结果if (s CANCELLED)throw new CancellationException(); //被取消或中断了就抛异常。throw new ExecutionException((Throwable)x);}以上就是FutureTask的源码分析。
最后总结一下
FutureTask既可以当做Runnable也可以当做Future。线程通过执行FutureTask的run方法将正常运行的结果放入FutureTask类的result变量中。使用get方法可以阻塞直到获得结果。