山东聊城建设局网站,一站式服务平台官网,网站seo推广软件,牡丹江百度贴吧目录 一、前情提要二、通过Future获取异步返回值1、FutureTask 是基于 AbstractQueuedSynchronizer实现的2、FutureTask执行流程3、get()方法执行流程 三、FutureTask源码具体分析1、FutureTask源码2、将异步方法的返回值改为FutureInteger#xff0c;将返回值放到new… 目录 一、前情提要二、通过Future获取异步返回值1、FutureTask 是基于 AbstractQueuedSynchronizer实现的2、FutureTask执行流程3、get()方法执行流程 三、FutureTask源码具体分析1、FutureTask源码2、将异步方法的返回值改为FutureInteger将返回值放到new AsyncResult();中 3、通过FutureInteger.get()获取返回值4、这里也可以通过新线程Future获取Future返回值在BUG中磨砺在优化中成长 大家好我是哪吒。
一、前情提要
在上一篇文章中我们通过双异步的方式导入了10万行的Excel有个小伙伴在评论区问我如何保证插入后数据的一致性呢
很简单通过对比Excel文件行数和入库数量是否相等即可。
那么如何获取异步线程的返回值呢 二、通过Future获取异步返回值
我们可以通过给异步方法添加Future返回值的方式获取结果。
FutureTask 除了实现 Future 接口外还实现了 Runnable 接口。因此FutureTask 可以交给 Executor 执行也可以由调用线程直接执行FutureTask.run(。
1、FutureTask 是基于 AbstractQueuedSynchronizer实现的 AbstractQueuedSynchronizer简称AQS它是一个同步框架它提供通用机制来原子性管理同步状态、阻塞和唤醒线程以及 维护被阻塞线程的队列。 基于 AQS 实现的同步器包括 ReentrantLock、Semaphore、ReentrantReadWriteLock、 CountDownLatch 和 FutureTask。 基于 AQS实现的同步器包含两种操作
acquire阻塞调用线程直到AQS的状态允许这个线程继续执行在FutureTask中get()就是这个方法release改变AQS的状态使state变为非阻塞状态在FutureTask中可以通过run()和cancel()实现。
2、FutureTask执行流程 执行Async异步方法建立新线程async-executor-X执行Runnable的run()方法FutureTask实现RunnableFutureRunnableFuture实现Runnable判断状态state 如果未新建或者不处于AQS直接返回否则进入COMPLETING状态执行异步线程代码 如果执行cancel()方法改变AQS的状态时会唤醒AQS等待队列中的第一个线程线程async-executor-1线程async-executor-1被唤醒后 将自己从AQS队列中移除然后唤醒next线程async-executor-2改变线程async-executor-1的state等待get()线程取值。 next等待线程被唤醒后循环线程async-executor-1的步骤 被唤醒从AQS队列中移除唤醒next线程改变异步线程状态 新建线程async-executor-N监听异步方法的state 如果处于EXCEPTIONAL以上状态抛出异常如果处于COMPLETING状态加入AQS队列等待如果处于NORMAL状态返回结果
3、get()方法执行流程
get()方法通过判断状态state观测异步线程是否已结束如果结束直接将结果返回否则会将等待节点扔进等待队列自旋阻塞住线程。
自旋直至异步线程执行完毕获取另一边的线程计算出结果或取消后将等待队列里的所有节点依次唤醒并移除队列。 如果state小于等于COMPLETING表示任务还在执行中 如果线程被中断从等待队列中移除等待节点WaitNode抛出中断异常如果state大于COMPLETING 如果已有等待节点WaitNode将线程置空返回当前状态 如果任务正在执行让出时间片如果还未构造等待节点则new一个新的等待节点如果未入队列CAS尝试入队如果有超时时间参数 计算超时时间如果超时则从等待队列中移除等待节点WaitNode返回当前状态state阻塞队列nanos毫秒。 否则阻塞队列 如果state大于COMPLETING 如果执行完毕返回结果如果大于等于取消状态则抛出异常。
很多小朋友对读源码嗤之以鼻工作3年、5年还是没认真读过任何源码觉得读了也没啥用或者读了也看不懂~
其实只要把源码的执行流程通过画图的形式呈现出来你就会幡然醒悟原来是这样的~
简而言之
1. 如果异步线程还没执行完则进入CAS自旋 2. 其它线程获取结果或取消后重新唤醒CAS队列中等待的线程 3. 再通过get()判断状态state 4. 直至返回结果或取消、超时、异常为止。
三、FutureTask源码具体分析
1、FutureTask源码
通过定义整形状态值判断state大小这个思想很有意思值得学习。
public interface RunnableFutureV extends Runnable, FutureV {/*** Sets this Future to the result of its computation* unless it has been cancelled.*/void run();
}public class FutureTaskV implements RunnableFutureV {// 最初始的状态是new 新建状态private volatile int state;private static final int NEW 0; // 新建状态private static final int COMPLETING 1; // 完成中private static final int NORMAL 2; // 正常执行完private static final int EXCEPTIONAL 3; // 异常private static final int CANCELLED 4; // 取消private static final int INTERRUPTING 5; // 正在中断private static final int INTERRUPTED 6; // 已中断public V get() throws InterruptedException, ExecutionException {int s state;// 任务还在执行中if (s COMPLETING)s awaitDone(false, 0L);return report(s);}private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline timed ? System.nanoTime() nanos : 0L;WaitNode q null;boolean queued false;for (;;) {// 线程被中断从等待队列中移除等待节点WaitNode抛出中断异常if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s state;// 任务已执行完毕或取消if (s COMPLETING) {// 如果已有等待节点WaitNode将线程置空if (q ! null)q.thread null;return s;}// 任务正在执行让出时间片else if (s COMPLETING) // cannot time out yetThread.yield();// 还未构造等待节点则new一个新的等待节点else if (q null)q new WaitNode();// 未入队列CAS尝试入队else if (!queued)queued UNSAFE.compareAndSwapObject(this, waitersOffset,q.next waiters, q);// 如果有超时时间参数else if (timed) {// 计算超时时间nanos deadline - System.nanoTime();// 如果超时则从等待队列中移除等待节点WaitNode返回当前状态stateif (nanos 0L) {removeWaiter(q);return state;}// 阻塞队列nanos毫秒LockSupport.parkNanos(this, nanos);}else// 阻塞队列LockSupport.park(this);}}private V report(int s) throws ExecutionException {// 获取outcome中记录的返回结果Object x outcome;// 如果执行完毕返回结果if (s NORMAL)return (V)x;// 如果大于等于取消状态则抛出异常if (s CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}
}2、将异步方法的返回值改为FutureInteger将返回值放到new AsyncResult();中
Async(async-executor)
public void readXls(String filePath, String filename) {try {// 此代码为简化关键性代码ListFutureInteger futureList new ArrayList();for (int time 0; time times; time) {FutureInteger sumFuture readExcelDataAsyncFutureService.readXlsCacheAsync();futureList.add(sumFuture);}}catch (Exception e){logger.error(readXlsCacheAsync---插入数据异常,e);}
}Async(async-executor)
public FutureInteger readXlsCacheAsync() {try {// 此代码为简化关键性代码return new AsyncResult(sum);}catch (Exception e){return new AsyncResult(0);}
}3、通过FutureInteger.get()获取返回值
public static boolean getFutureResult(ListFutureInteger futureList, int excelRow) throws Exception{int[] futureSumArr new int[futureList.size()];for (int i 0;ifutureList.size();i) {try {FutureInteger future futureList.get(i);while (true) {if (future.isDone() !future.isCancelled()) {Integer futureSum future.get();logger.info(获取Future返回值成功----Future: future ,Result: futureSum);futureSumArr[i] futureSum;break;} else {logger.info(Future正在执行---获取Future返回值中---等待3秒);Thread.sleep(3000);}}} catch (Exception e) {logger.error(获取Future返回值异常: , e);}}boolean insertFlag getInsertSum(futureSumArr, excelRow);logger.info(获取所有异步线程Future的返回值成功Excel插入结果insertFlag);return insertFlag;
}4、这里也可以通过新线程Future获取Future返回值
不过感觉多此一举了就当练习Future异步取返回值了~
public static FutureBoolean getFutureResultThreadFuture(ListFutureInteger futureList, int excelRow) throws Exception {ExecutorService service Executors.newSingleThreadExecutor();final boolean[] insertFlag {false};service.execute(new Runnable() {public void run() {try {insertFlag[0] getFutureResult(futureList, excelRow);} catch (Exception e) {logger.error(新线程Future获取Future返回值异常: , e);insertFlag[0] false;}}});service.shutdown();return new AsyncResult(insertFlag[0]);
}获取异步线程结果后我们可以通过添加事务的方式实现Excel入库操作的数据一致性。
但Future会造成主线程的阻塞这个就很不友好了有没有更优解呢 在BUG中磨砺在优化中成长
使用双异步后从 191s 优化到 2s
增加索引 异步 不落地后从 12h 优化到 15 min
使用懒加载 零拷贝后程序的秒开率提升至99.99%
性能优化2.0新增缓存后程序的秒开率不升反降 文章收录于100天精通Java从入门到就业
全网最细Java零基础手把手入门教程系列课程包括Java基础、Java8新特性、Java集合、高并发、性能优化等适合零基础和进阶提升的同学。
哪吒多年工作总结Java学习路线总结搬砖工逆袭Java架构师。 华为OD机试 2023B卷题库疯狂收录中刷题点这里 刷的越多抽中的概率越大每一题都有详细的答题思路、详细的代码注释、样例测试发现新题目随时更新全天CSDN在线答疑。