瓯海网站建设,成都彩蝶花卉网站建设案例,台州做网站需要多少钱,手机网站商城建设答辩问题叉叉框架本文是我们名为Java Concurrency Essentials的学院课程的一部分。 在本课程中#xff0c;您将深入探讨并发的魔力。 将向您介绍并发和并发代码的基础知识#xff0c;并学习诸如原子性#xff0c;同步和线程安全性的概念。 在这里查看 #xff01; 目录 1.简介 2… 叉叉框架 本文是我们名为Java Concurrency Essentials的学院课程的一部分。 在本课程中您将深入探讨并发的魔力。 将向您介绍并发和并发代码的基础知识并学习诸如原子性同步和线程安全性的概念。 在这里查看 目录 1.简介 2.叉/连接 2.1。 递归任务 2.2。 递归动作 2.3。 ForkJoinPool和ExecutorService 1.简介 本文介绍了Fork / Join框架该框架从1.7版开始就是JDK的一部分。 它描述了框架的基本功能并提供了一些示例以提供一些实践经验。 2.叉/连接 Fork / Join框架的基类是java.util.concurrent.ForkJoinPool 。 此类实现Executor和ExecutorService这两个接口并AbstractExecutorService 。 因此 ForkJoinPool基本上是一个线程池它承担特殊任务即ForkJoinTask 。 此类实现已知的Future接口以及诸如get() cancel()和isDone() 。 除此之外该类还提供了两个为整个框架命名的方法 fork()和join() 。 调用fork()将启动任务的异步执行时调用join()将等待直到任务完成并检索其结果。 因此我们可以将给定任务拆分为多个较小的任务分叉每个任务最后等待所有任务完成。 这使复杂问题的实现更加容易。 在计算机科学中这种方法也称为分治法。 每当一个问题太复杂而无法立即解决时它就会分为多个较小的问题并且更容易解决。 可以这样写成伪代码 if(problem.getSize() THRESHOLD) {SmallerProblem smallerProblem1 new SmallerProblem();smallerProblem1.fork();SmallerProblem smallerProblem2 new SmallerProblem();smallerProblem2.fork();return problem.solve(smallerProblem1.join(), smallerProblem2.join());
} else {return problem.solve();
} 首先我们检查问题的当前大小是否大于给定的阈值。 在这种情况下我们将问题分成较小的问题对每个新任务进行fork() 然后通过调用join()等待结果。 当join()返回每个子任务的结果时我们必须找到较小问题的最佳解决方案并将其作为最佳解决方案返回。 重复这些步骤直到给定的阈值太低并且问题很小我们可以直接计算其解而无需进一步除法。 递归任务 为了更好地掌握此过程我们实现了一种算法该算法可在整数值数组中找到最小的数字。 这个问题不是您使用ForkJoinPool在日常工作中解决的问题但是以下实现非常清楚地显示了基本原理。 在main()方法中我们设置了一个带有随机值的整数数组并创建了一个新的ForkJoinPool 。 传递给其构造函数的第一个参数是所需并行度的指示器。 在这里我们在Runtime查询可用的CPU内核数。 然后我们调用invoke()方法并传递FindMin的实例。 FindMin扩展了RecursiveTask类该类本身是前面提到的ForkJoinTask的子类。 类ForkJoinTask实际上有两个子类一个子类用于返回值的任务 RecursiveTask 另一个子类用于不返回值的任务 RecursiveAction 。 超类迫使我们实现compute() 。 在这里我们看一下整数数组的给定切片并确定当前问题是否太大而无法立即解决。 当在数组中找到最小的数时要直接解决的最小问题大小是将两个元素相互比较并返回它们的最小值。 如果当前有两个以上的元素则将数组分为两部分然后再在这两个部分中找到最小的数字。 通过创建两个新的FindMin实例来完成此操作。 构造函数被提供给数组以及开始和结束索引。 然后我们通过调用fork()异步开始执行这两个任务。 该调用将两个任务提交到线程池的队列中。 线程池实现了一种称为工作窃取的策略即如果所有其他线程都有足够的工作要做则当前线程会从其他任务之一中窃取其工作。 这样可以确保任务尽快执行。 public class FindMin extends RecursiveTaskInteger {private static final long serialVersionUID 1L;private int[] numbers;private int startIndex;private int endIndex;public FindMin(int[] numbers, int startIndex, int endIndex) {this.numbers numbers;this.startIndex startIndex;this.endIndex endIndex;}Overrideprotected Integer compute() {int sliceLength (endIndex - startIndex) 1;if (sliceLength 2) {FindMin lowerFindMin new FindMin(numbers, startIndex, startIndex (sliceLength / 2) - 1);lowerFindMin.fork();FindMin upperFindMin new FindMin(numbers, startIndex (sliceLength / 2), endIndex);upperFindMin.fork();return Math.min(lowerFindMin.join(), upperFindMin.join());} else {return Math.min(numbers[startIndex], numbers[endIndex]);}}public static void main(String[] args) {int[] numbers new int[100];Random random new Random(System.currentTimeMillis());for (int i 0; i numbers.length; i) {numbers[i] random.nextInt(100);}ForkJoinPool pool new ForkJoinPool(Runtime.getRuntime().availableProcessors());Integer min pool.invoke(new FindMin(numbers, 0, numbers.length - 1));System.out.println(min);}
}递归动作 正如上面在RecursiveTask旁边提到的我们还有RecursiveAction类。 与RecursiveTask相比它不必返回值因此可以将其用于可以直接在给定数据结构上执行的异步计算。 这样的例子是从彩色图像中计算出灰度图像。 我们要做的就是遍历图像的每个像素并使用以下公式从RGB值中计算灰度值 gray 0.2126 * red 0.7152 * green 0.0722 * blue 浮点数表示特定颜色对我们人类对灰色的感知做出的贡献。 由于最高值用于绿色因此可以得出结论灰度图像仅被计算为绿色部分的近3/4。 因此假设图像是代表实际像素数据的对象并且使用setRGB()和getRGB()方法检索实际RGB值则基本实现将如下所示 for (int row 0; row height; row) {for (int column 0; column bufferedImage.getWidth(); column) {int grayscale computeGrayscale(image.getRGB(column, row));image.setRGB(column, row, grayscale);}
} 上面的实现在单个CPU机器上运行良好。 但是如果我们有多个CPU可用我们可能希望将此工作分配给可用的内核。 因此我们可以使用ForkJoinPool并为图像的每一行或每一列提交一个新任务而不是遍历所有像素的两个嵌套for循环。 一旦将一行转换为灰度当前线程就可以在另一行上工作。 以下示例实现了此原理 public class GrayscaleImageAction extends RecursiveAction {private static final long serialVersionUID 1L;private int row;private BufferedImage bufferedImage;public GrayscaleImageAction(int row, BufferedImage bufferedImage) {this.row row;this.bufferedImage bufferedImage;}Overrideprotected void compute() {for (int column 0; column bufferedImage.getWidth(); column) {int rgb bufferedImage.getRGB(column, row);int r (rgb 16) 0xFF;int g (rgb 8) 0xFF;int b (rgb 0xFF);int gray (int) (0.2126 * (float) r 0.7152 * (float) g 0.0722 * (float) b);gray (gray 16) (gray 8) gray;bufferedImage.setRGB(column, row, gray);}}public static void main(String[] args) throws IOException {ForkJoinPool pool new ForkJoinPool(Runtime.getRuntime().availableProcessors());BufferedImage bufferedImage ImageIO.read(new File(args[0]));for (int row 0; row bufferedImage.getHeight(); row) {GrayscaleImageAction action new GrayscaleImageAction(row, bufferedImage);pool.execute(action);}pool.shutdown();ImageIO.write(bufferedImage, jpg, new File(args[1]));}
} 在main方法中我们使用Java的ImageIO类读取图像。 返回的BufferedImage实例具有我们需要的所有方法。 我们可以查询行数和列数并检索和设置每个像素的RGB值。 因此我们要做的是遍历所有行并将新的GrayscaleImageAction提交到我们的ForkJoinPool 。 后者已收到有关可用处理器的提示作为其构造函数的参数。 现在 ForkJoinPool通过调用其compute()方法来异步启动任务。 在此方法中我们遍历每行并通过其灰度值更新相应的RGB值。 将所有任务提交给池后我们在主线程中等待整个池的关闭然后使用ImageIO.write()方法将更新的BufferedImage写回到磁盘。 令人惊讶的是与不使用可用处理器的情况相比我们只需要多几行代码即可。 这再次显示了使用java.util.concurrent包的可用资源可以节省多少工作。 ForkJoinPool提供了三种不同的提交任务的方法 execute(ForkJoinTask) 此方法异步执行给定的任务。 它没有返回值。 invoke(ForkJoinTask) 此方法等待任务返回值。 submit(ForkJoinTask) 此方法异步执行给定的任务。 它返回对任务本身的引用。 因此任务引用可用于查询结果因为它实现了Future接口。 有了这些知识很清楚为什么我们要使用execute方法提交上述GrayscaleImageAction 。 如果我们改为使用invoke() 则主线程将等待任务完成而我们将不会利用可用的并行度。 仔细研究ForkJoinTask-API我们会发现相同的区别 ForkJoinTask.fork() ForkJoinTask是异步执行的。 它没有返回值。 ForkJoinTask.invoke() 立即执行ForkJoinTask并在完成后返回结果。 ForkJoinPool和ExecutorService 既然我们知道ExecutorService和ForkJoinPool 您可能会问自己为什么我们应该使用ForkJoinPool而不是ExecutorService 。 两者之间的差异不是很大。 两者都具有execute()和submit()方法并采用一些常见接口的实例例如Runnable Callable RecursiveAction或RecursiveTask 。 为了更好地理解这些区别让我们尝试使用ExecutorService从上面实现FindMin类 public class FindMinTask implements CallableInteger {private int[] numbers;private int startIndex;private int endIndex;private ExecutorService executorService;public FindMinTask(ExecutorService executorService, int[] numbers, int startIndex, int endIndex) {this.executorService executorService;this.numbers numbers;this.startIndex startIndex;this.endIndex endIndex;}public Integer call() throws Exception {int sliceLength (endIndex - startIndex) 1;if (sliceLength 2) {FindMinTask lowerFindMin new FindMinTask(executorService, numbers, startIndex, startIndex (sliceLength / 2) - 1);FutureInteger futureLowerFindMin executorService.submit(lowerFindMin);FindMinTask upperFindMin new FindMinTask(executorService, numbers, startIndex (sliceLength / 2), endIndex);FutureInteger futureUpperFindMin executorService.submit(upperFindMin);return Math.min(futureLowerFindMin.get(), futureUpperFindMin.get());} else {return Math.min(numbers[startIndex], numbers[endIndex]);}}public static void main(String[] args) throws InterruptedException, ExecutionException {int[] numbers new int[100];Random random new Random(System.currentTimeMillis());for (int i 0; i numbers.length; i) {numbers[i] random.nextInt(100);}ExecutorService executorService Executors.newFixedThreadPool(64);FutureInteger futureResult executorService.submit(new FindMinTask(executorService, numbers, 0, numbers.length-1));System.out.println(futureResult.get());executorService.shutdown();}
} 该代码看起来非常相似期望我们submit()任务submit()给ExecutorService 然后使用返回的Future实例来wait()结果。 两种实现之间的主要区别可以在构造线程池的那一点上找到。 在上面的示例中我们创建了一个具有64个线程的固定线程池。 为什么选择这么大的数字 原因是对每个返回的Future调用get()阻塞当前线程直到结果可用为止。 如果我们仅向可用池提供尽可能多的线程则程序将耗尽资源并无限期地挂起。 ForkJoinPool实现了已经提到的工作窃取策略即每次运行线程必须等待某些结果时 该线程从工作队列中删除当前任务并执行其他准备运行的任务。 这样当前线程不会被阻塞并且可以用来执行其他任务。 一旦计算出最初暂停的任务的结果该任务就会再次执行join方法将返回结果。 这与普通的ExecutorService有一个重要的区别在常规ExecutorService 您必须在等待结果时阻止当前线程。 翻译自: https://www.javacodegeeks.com/2015/09/forkjoin-framework.html叉叉框架