宁波做网站哪里专业,山西自助建站费用低,广州 美容 公司 网站建设,电子商务网站开发案例文章目录 AsyncToolAsyncTool是什么#xff1f;AsyncTool快速入门1#xff09;导入依赖2#xff09;自定义Worker3#xff09;编排包装类Wrapper4#xff09;提交任务执行5#xff09;运行结果 并发编程常见的场景串行并行阻塞等待 - 先串行#xff0c;后并行阻塞等待 -… 文章目录 AsyncToolAsyncTool是什么AsyncTool快速入门1导入依赖2自定义Worker3编排包装类Wrapper4提交任务执行5运行结果 并发编程常见的场景串行并行阻塞等待 - 先串行后并行阻塞等待 - 先并行后串行 AsyncTool
AsyncTool是什么
是京东开源的一个可编排多线程框架可解决任意的多线程并行、串行、阻塞、依赖、回调的并行框架。可以任意组合各线程的执行顺序并且带有全链路执行结果回调。是多线程编排一站式解决方案。注它是单机的不支持分布式编排是对CompletableFuture的进一步封装。这里对框架的使用做一下总结供日后工作中方便查看。
AsyncTool快速入门
A、B、C串行任务示例。
1导入依赖
去gitee搜AsyncTool京东开源项目。
2自定义Worker
自定义线程任务A、B、C实现IWorkerICallback函数式接口并重写下面的4个方法。
begin()Worker开始执行前先回调begin()action()Worker中执行耗时操作的地方比如RPC接口调用。result()action()执行完毕后回调result方法可以在此处处理action中的返回值。defaultValue()整个Worker执行异常或者超时会回调defaultValue()Worker返回默认值。
workerA
(action模拟线程任务耗时操作此处举例仅对参数1)
public class WorkerA implements IWorkerInteger, Integer, ICallbackInteger, Integer {/*** Worker开始的时候先执行begin*/Overridepublic void begin() {System.out.println(A - Thread: Thread.currentThread().getName() - start -- SystemClock.now());}/*** Worker中耗时操作在此执行RPC/IO* param object object* param allWrappers 任务包装* return*/Overridepublic Integer action(Integer object, MapString, WorkerWrapper allWrappers) {Integer res object 1;return res;}/*** action执行结果的回调* param success* param param* param workResult*/Overridepublic void result(boolean success, Integer param, WorkResultInteger workResult) {System.out.println(A - param: JSON.toJSONString(param));System.out.println(A - result: JSON.toJSONString(workResult));System.out.println(A - Thread: Thread.currentThread().getName() - end -- SystemClock.now());}/*** Worker异常时的回调* return*/Overridepublic Integer defaultValue() {System.out.println(A - defaultValue);return 101;}
}workerB
(action()模拟线程任务耗时操作此处举例仅对参数2)
public class WorkerB implements IWorkerInteger, Integer, ICallbackInteger, Integer {/*** Worker开始的时候先执行begin*/Overridepublic void begin() {System.out.println(B - Thread: Thread.currentThread().getName() - start -- SystemClock.now());}/*** Worker中耗时操作在此执行RPC/IO* param object object* param allWrappers 任务包装* return*/Overridepublic Integer action(Integer object, MapString, WorkerWrapper allWrappers) {Integer res object 2;return res;}/*** action执行结果的回调* param success* param param* param workResult*/Overridepublic void result(boolean success, Integer param, WorkResultInteger workResult) {System.out.println(B - param: JSON.toJSONString(param));System.out.println(B - result: JSON.toJSONString(workResult));System.out.println(B - Thread: Thread.currentThread().getName() - end -- SystemClock.now());}/*** Worker异常时的回调* return*/Overridepublic Integer defaultValue() {System.out.println(B - defaultValue);return 102;}
}WorkerC
(action()模拟线程任务耗时操作此处举例仅对参数3)
public class WorkerC implements IWorkerInteger, Integer, ICallbackInteger, Integer {/*** Worker开始的时候先执行begin*/Overridepublic void begin() {System.out.println(C - Thread: Thread.currentThread().getName() - start -- SystemClock.now());}/*** Worker中耗时操作在此执行RPC/IO* param object object* param allWrappers 任务包装* return*/Overridepublic Integer action(Integer object, MapString, WorkerWrapper allWrappers) {Integer res object 3;return res;}/*** action执行结果的回调* param success* param param* param workResult*/Overridepublic void result(boolean success, Integer param, WorkResultInteger workResult) {System.out.println(C - param: JSON.toJSONString(param));System.out.println(C - result: JSON.toJSONString(workResult));System.out.println(C - Thread: Thread.currentThread().getName() - end -- SystemClock.now());}/*** Worker异常时的回调* return*/Overridepublic Integer defaultValue() {System.out.println(C - defaultValue);return 103;}
}3编排包装类Wrapper
Worker创建好之后使用WorkerWrapper对Worker进行包装以及编排WorkerWrapper是AsyncTool组件的最小可执行任务单元。
C是最后一步它没有next。B的next是CA的next是B。编排顺序就是C - B - A
public class Test {public static void main(String[] args) {//引入Worker工作单元WorkerA workerA new WorkerA();WorkerB workerB new WorkerB();WorkerC workerC new WorkerC();//包装Worker编排串行顺序C - B - A//C是最后一步它没有nextWorkerWrapper wrapperC new WorkerWrapper.BuilderInteger, Integer().id(workerC).worker(workerC).callback(workerC).param(3)//33.build();//B的next是CWorkerWrapper wrapperB new WorkerWrapper.BuilderInteger, Integer().id(workerB).worker(workerB).callback(workerB).param(2)//22.next(wrapperC).build();//A的next是BWorkerWrapper wrapperA new WorkerWrapper.BuilderInteger, Integer().id(workerA).worker(workerA).callback(workerA).param(1)//11.next(wrapperB).build();try {//Action 提交任务Async.beginWork(1000, wrapperA);} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}}
}或者还有一种写法可以使用depend方式编排
//A没有depend
WorkerWrapper wrapperA new WorkerWrapper.BuilderInteger, Integer().id(workerA).worker(workerA).callback(workerA).param(1).build();//B的depend是A
WorkerWrapper wrapperB new WorkerWrapper.BuilderInteger, Integer().id(workerB).worker(workerB).callback(workerB).param(2).depend(wrapperA).build();//C的depend是B
WorkerWrapper wrapperC new WorkerWrapper.BuilderInteger, Integer().id(workerC).worker(workerC).callback(workerC).param(3).depend(wrapperB).build();
//begin
Async.beginWork(1000, wrapperA);
4提交任务执行
通过执行器类Async的beginWork方法提交任务执行。
//默认不定长线程池
private static final ThreadPoolExecutor COMMON_POOL (ThreadPoolExecutor) Executors.newCachedThreadPool();//提交任务
Async.beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper)Timeout全组任务超时时间设定如果Worker任务超时则Worker结果使用defaultValue()默认值。ExecutorService executorService自定义线程池不自定义的话就走默认的COMMON_POOL。默认的线程池是不定长线程池。WorkerWrapper… workerWrapper起始任务可以是多个。注意不要提交中间节点的任务只需要提交起始任务即可编排的后续任务会自动执行。
5运行结果
运行结果A112B224C336
并发编程常见的场景
串行
Worker创建好之后使用WorkerWrapper对Worker进行包装以及编排WorkerWrapper是AsyncTool组件的最小可执行任务单元。
C是最后一步它没有next。B的next是CA的next是B。编排顺序就是C - B - A
1next写法
public class Test {public static void main(String[] args) {//引入Worker工作单元WorkerA workerA new WorkerA();WorkerB workerB new WorkerB();WorkerC workerC new WorkerC();//包装Worker编排串行顺序C - B - A//C是最后一步它没有nextWorkerWrapper wrapperC new WorkerWrapper.BuilderInteger, Integer().id(workerC).worker(workerC).callback(workerC).param(3)//33.build();//B的next是CWorkerWrapper wrapperB new WorkerWrapper.BuilderInteger, Integer().id(workerB).worker(workerB).callback(workerB).param(2)//22.next(wrapperC).build();//A的next是BWorkerWrapper wrapperA new WorkerWrapper.BuilderInteger, Integer().id(workerA).worker(workerA).callback(workerA).param(1)//11.next(wrapperB).build(); try {//Action 提交任务Async.beginWork(1000, wrapperA);} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}}
}并行
WorkerWrapper并行编排A\B\C都没有next和depend 3个WorkerWrapper一起begin。
Async.beginWork(1000, wrapperA, wrapperB, wrapperC);
public class Test {public static void main(String[] args) {//引入Worker工作单元WorkerA workerA new WorkerA();WorkerB workerB new WorkerB();WorkerC workerC new WorkerC();/*** 包装Worker编排并行顺序*///AWorkerWrapper wrapperA new WorkerWrapper.BuilderInteger, Integer().id(workerA).worker(workerA).callback(workerA).param(1)//11.build();//BWorkerWrapper wrapperB new WorkerWrapper.BuilderInteger, Integer().id(workerB).worker(workerB).callback(workerB).param(2)//22.build();//CWorkerWrapper wrapperC new WorkerWrapper.BuilderInteger, Integer().id(workerC).worker(workerC).callback(workerC).param(3)//33.build();try {//3个WorkerWrapper一起beginAsync.beginWork(1000, wrapperA, wrapperB, wrapperC);} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}}
}
阻塞等待 - 先串行后并行
阻塞等待 - 先串行后并行场景模拟A先执行对参数1A执行完毕之后B\C同时并行执行B任务基于A的返回值2C任务基于A的返回值3
1next写法
public static void nextWork() {//引入Worker工作单元WorkerA workerA new WorkerA();WorkerB workerB new WorkerB();WorkerC workerC new WorkerC();//C是最后一步它没有nextWorkerWrapper wrapperC new WorkerWrapper.BuilderInteger, Integer().id(workerC).worker(workerC).callback(workerC).param(null)//没有参数根据A的返回值3.build();//B是最后一步它没有nextWorkerWrapper wrapperB new WorkerWrapper.BuilderInteger, Integer().id(workerB).worker(workerB).callback(workerB).param(null)//没有参数根据A的返回值2.build();//A的next是B、CWorkerWrapper wrapperA new WorkerWrapper.BuilderInteger, Integer().id(workerA).worker(workerA).callback(workerA).param(1)//11//next是B、C.next(wrapperB, wrapperC).build();try {//ActionAsync.beginWork(1000, wrapperA);} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}
}2depend写法
//A没有depend就是开始
WorkerWrapper wrapperA new WorkerWrapper.BuilderInteger, Integer().id(workerA).worker(workerA).callback(workerA).param(1).build();//C depend A
WorkerWrapper wrapperC new WorkerWrapper.BuilderInteger, Integer().id(workerC).worker(workerC).callback(workerC).param(null).depend(wrapperA)//依赖A.build();
W
//B depend A
WorkerWrapper wrapperB new WorkerWrapper.BuilderInteger, Integer().id(workerB).worker(workerB).callback(workerB).param(null).depend(wrapperA)//依赖A.build();
阻塞等待 - 先并行后串行
B\C并行执行。B对参数2C对参数3B\C全部执行完后A B返回值C返回值。
注意需要B和C同时begin。Async.beginWork(4000, wrapperB, wrapperC);
1next写法
public static void nextWork() {//引入Worker工作单元WorkerA workerA new WorkerA();WorkerB workerB new WorkerB();WorkerC workerC new WorkerC();//A是最后一步没有nextWorkerWrapper wrapperA new WorkerWrapper.BuilderInteger, Integer().id(workerA).worker(workerA).callback(workerA).param(null)//参数是nullA B C.build();//C next AWorkerWrapper wrapperC new WorkerWrapper.BuilderInteger, Integer().id(workerC).worker(workerC).callback(workerC).param(3)//33 6.next(wrapperA).build();//B next AWorkerWrapper wrapperB new WorkerWrapper.BuilderInteger, Integer().id(workerB).worker(workerB).callback(workerB).param(2)//22 4.next(wrapperA).build();try {new SynchronousQueueRunnable();//ActionAsync.beginWork(4000, wrapperB, wrapperC);} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}
}2depend写法
//C没有depend是起始节点
WorkerWrapper wrapperC new WorkerWrapper.BuilderInteger, Integer().id(workerC).worker(workerC).callback(workerC).param(3)//33 6.build();
//B没有depend是起始节点
WorkerWrapper wrapperB new WorkerWrapper.BuilderInteger, Integer().id(workerB).worker(workerB).callback(workerB).param(2)//22 4.build();
//A depend B,C
WorkerWrapper wrapperA new WorkerWrapper.BuilderInteger, Integer().id(workerA).worker(workerA).callback(workerA).param(null)//参数是nullA B C.depend(wrapperB, wrapperC).build();