上海网站建设高端定制网络服务公司,wordpress打开插件的时候很慢,求个没封的w站2022,设计一个商务网站Fork / Join框架是使用并发分治法解决问题的框架。 引入它们是为了补充现有的并发API。 在介绍它们之前#xff0c;现有的ExecutorService实现是运行异步任务的流行选择#xff0c;但是当任务同质且独立时#xff0c;它们会发挥最佳作用。 运行依赖的任务并使用这些实现来组… Fork / Join框架是使用并发分治法解决问题的框架。 引入它们是为了补充现有的并发API。 在介绍它们之前现有的ExecutorService实现是运行异步任务的流行选择但是当任务同质且独立时它们会发挥最佳作用。 运行依赖的任务并使用这些实现来组合其结果并不容易。 随着Fork / Join框架的引入人们试图解决这一缺陷。 在本文中我们将简要介绍API并解决几个简单的问题以了解其工作原理。 解决非阻塞任务 让我们直接跳入代码。 让我们创建一个任务该任务将返回List的所有元素的总和。 以下步骤以伪代码表示我们的算法 01.查找列表的中间索引 02.在中间划分列表 03.递归创建一个新任务该任务将计算剩余部分的总和 04.递归创建一个新任务该任务将计算正确部分的总和 05.将左总和中间元素和右总和的结果相加 这是代码– Slf4j
public class ListSummer extends RecursiveTaskInteger {private final ListInteger listToSum;ListSummer(ListInteger listToSum) {this.listToSum listToSum;}Overrideprotected Integer compute() {if (listToSum.isEmpty()) {log.info(Found empty list, sum is 0);return 0;}int middleIndex listToSum.size() / 2;log.info(List {}, middle Index: {}, listToSum, middleIndex);ListInteger leftSublist listToSum.subList(0, middleIndex);ListInteger rightSublist listToSum.subList(middleIndex 1, listToSum.size());ListSummer leftSummer new ListSummer(leftSublist);ListSummer rightSummer new ListSummer(rightSublist);leftSummer.fork();rightSummer.fork();Integer leftSum leftSummer.join();Integer rightSum rightSummer.join();int total leftSum listToSum.get(middleIndex) rightSum;log.info(Left sum is {}, right sum is {}, total is {}, leftSum, rightSum, total);return total;}
} 首先我们扩展了ForkJoinTask的RecursiveTask子类型。 这是我们期望并发任务返回结果时的扩展类型。 当任务不返回结果而仅执行效果时我们扩展RecursiveAction子类型。 对于我们解决的大多数实际任务这两个子类型就足够了。 其次RecursiveTask和RecursiveAction都定义了一种抽象计算方法。 这是我们进行计算的地方。 第三在我们的计算方法内部我们检查通过构造函数传递的列表的大小。 如果为空则我们已经知道总和的结果为零然后我们立即返回。 否则我们将列表分为两个子列表并创建ListSummer类型的两个实例。 然后我们在这两个实例上调用fork方法在ForkJoinTask中定义– leftSummer.fork();
rightSummer.fork(); 导致将这些任务安排为异步执行的原因稍后将在本文中解释用于此目的的确切机制。 之后我们调用join方法也在ForkJoinTask中定义以等待这两部分的结果 Integer leftSum leftSummer.join();
Integer rightSum rightSummer.join(); 然后将其与列表的中间元素相加以获得最终结果。 添加了许多日志消息以使示例更易于理解。 但是当我们处理包含数千个条目的列表时拥有详细的日志记录尤其是记录整个列表可能不是一个好主意。 就是这样。 现在为测试运行创建一个测试类– public class ListSummerTest {Testpublic void shouldSumEmptyList() {ListSummer summer new ListSummer(List.of());ForkJoinPool forkJoinPool new ForkJoinPool();forkJoinPool.submit(summer);int result summer.join();assertThat(result).isZero();}Testpublic void shouldSumListWithOneElement() {ListSummer summer new ListSummer(List.of(5));ForkJoinPool forkJoinPool new ForkJoinPool();forkJoinPool.submit(summer);int result summer.join();assertThat(result).isEqualTo(5);}Testpublic void shouldSumListWithMultipleElements() {ListSummer summer new ListSummer(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9));ForkJoinPool forkJoinPool new ForkJoinPool();forkJoinPool.submit(summer);int result summer.join();assertThat(result).isEqualTo(45);}
} 在测试中我们创建一个ForkJoinPool的实例。 ForkJoinPool是用于运行ForkJoinTasks的唯一ExecutorService实现。 它采用一种称为工作窃取算法的特殊算法。 与其他ExecutorService实现相反在该实现中只有一个队列包含要执行的所有任务在工作窃取实现中每个工作线程都获得其工作队列。 每个线程都从其队列开始执行任务。 当我们检测到ForkJoinTask可以分解为多个较小的子任务时便将它们分解为较小的任务然后在这些任务上调用fork方法。 该调用导致子任务被推入执行线程的队列中。 在执行期间当一个线程用尽队列/没有要执行的任务时它可以从其他线程的队列中“窃取”任务因此称为“工作窃取”。 与使用任何其他ExecutorService实现相比这种窃取行为可以带来更高的吞吐量。 之前当我们在leftSummer和rightSummer任务实例上调用fork时它们被推入执行线程的工作队列中之后它们被池中的其他活动线程“偷”依此类推因为它们确实那时没有其他事情要做。 很酷吧 解决阻止任务 我们刚才解决的问题本质上是非阻塞的。 如果我们想解决一个阻塞操作的问题那么为了获得更好的吞吐量我们将需要改变策略。 让我们用另一个例子来研究一下。 假设我们要创建一个非常简单的网络搜寻器。 该搜寻器将接收HTTP链接列表执行GET请求以获取响应主体然后计算响应长度。 这是代码– Slf4j
public class ResponseLengthCalculator extends RecursiveTaskMapString, Integer {private final ListString links;ResponseLengthCalculator(ListString links) {this.links links;}Overrideprotected MapString, Integer compute() {if (links.isEmpty()) {log.info(No more links to fetch);return Collections.emptyMap();}int middle links.size() / 2;log.info(Middle index: {}, links, middle);ResponseLengthCalculator leftPartition new ResponseLengthCalculator(links.subList(0, middle));ResponseLengthCalculator rightPartition new ResponseLengthCalculator(links.subList(middle 1, links.size()));log.info(Forking left partition);leftPartition.fork();log.info(Left partition forked, now forking right partition);rightPartition.fork();log.info(Right partition forked);String middleLink links.get(middle);HttpRequester httpRequester new HttpRequester(middleLink);String response;try {log.info(Calling managedBlock for {}, middleLink);ForkJoinPool.managedBlock(httpRequester);response httpRequester.response;} catch (InterruptedException ex) {log.error(Error occurred while trying to implement blocking link fetcher, ex);response ;}MapString, Integer responseMap new HashMap(links.size());MapString, Integer leftLinks leftPartition.join();responseMap.putAll(leftLinks);responseMap.put(middleLink, response.length());MapString, Integer rightLinks rightPartition.join();responseMap.putAll(rightLinks);log.info(Left map {}, middle length {}, right map {}, leftLinks, response.length(), rightLinks);return responseMap;}private static class HttpRequester implements ForkJoinPool.ManagedBlocker {private final String link;private String response;private HttpRequester(String link) {this.link link;}Overridepublic boolean block() {HttpGet headRequest new HttpGet(link);CloseableHttpClient client HttpClientBuilder.create().disableRedirectHandling().build();try {log.info(Executing blocking request for {}, link);CloseableHttpResponse response client.execute(headRequest);log.info(HTTP request for link {} has been executed, link);this.response EntityUtils.toString(response.getEntity());} catch (IOException e) {log.error(Error while trying to fetch response from link {}: {}, link, e.getMessage());this.response ;}return true;}Overridepublic boolean isReleasable() {return false;}}
} 我们创建ForkJoinPool.ManagedBlocker的实现在其中放置阻塞的HTTP调用。 该接口定义了两个方法– block和isReleasable 。 block方法是我们进行阻塞调用的地方。 在完成阻塞操作之后我们返回true指示不再需要进一步的阻塞。 我们从isReleasable实现中返回false以向fork-join工作线程指示block方法实现本质上可能在阻塞。 isReleasable实现将在调用block方法之前先由fork-join工作线程调用。 最后我们通过调用ForkJoinPool.managedBlock静态方法将HttpRequester实例提交到池中。 之后我们的阻止任务将开始执行。 当它阻塞HTTP请求时如果有必要ForkJoinPool.managedBlock方法还将安排激活备用线程以确保足够的并行性。 那么让我们将此实现用于测试驱动 这是代码– public class ResponseLengthCalculatorTest {Testpublic void shouldReturnEmptyMapForEmptyList() {ResponseLengthCalculator responseLengthCalculator new ResponseLengthCalculator(Collections.emptyList());ForkJoinPool pool new ForkJoinPool();pool.submit(responseLengthCalculator);MapString, Integer result responseLengthCalculator.join();assertThat(result).isEmpty();}Testpublic void shouldHandle200Ok() {ResponseLengthCalculator responseLengthCalculator new ResponseLengthCalculator(List.of(http://httpstat.us/200));ForkJoinPool pool new ForkJoinPool();pool.submit(responseLengthCalculator);MapString, Integer result responseLengthCalculator.join();assertThat(result).hasSize(1).containsKeys(http://httpstat.us/200).containsValue(0);}Testpublic void shouldFetchResponseForDifferentResponseStatus() {ResponseLengthCalculator responseLengthCalculator new ResponseLengthCalculator(List.of(http://httpstat.us/200,http://httpstat.us/302,http://httpstat.us/404,http://httpstat.us/502));ForkJoinPool pool new ForkJoinPool();pool.submit(responseLengthCalculator);MapString, Integer result responseLengthCalculator.join();assertThat(result).hasSize(4);}
} 今天就这样伙计们 与往常一样任何反馈/改进建议/评论都将受到高度赞赏 此处讨论的所有示例都可以在Github上找到 特定提交 。 大呼大叫的http://httpstat.us服务对于开发简单的测试非常有帮助。 翻译自: https://www.javacodegeeks.com/2019/01/brief-overview-fork-join-framework-java.html