网站创建数据库,wordpress动漫模板,伊宁网站建设,wordpress get author linkJDK的java.util.concurrent.ThreadPoolExecutor允许您将任务提交到线程池#xff0c;并使用BlockingQueue来保存提交的任务。 如果您要提交数千个任务#xff0c;请指定一个“绑定”队列#xff08;即最大容量的队列#xff09;#xff0c;否则JVM可能会耗尽内存。 您可以… JDK的java.util.concurrent.ThreadPoolExecutor允许您将任务提交到线程池并使用BlockingQueue来保存提交的任务。 如果您要提交数千个任务请指定一个“绑定”队列即最大容量的队列否则JVM可能会耗尽内存。 您可以设置RejectedExecutionHandler来处理队列已满时发生的情况但是仍然有待提交的任务。 这里是你展示如何使用一个简单的例子ThreadPoolExecutor具有BlockingQueue容量1000 CallerRunsPolicy确保当队列已满时其他任务将由提交线程处理。 int numThreads 5;
ExecutorService exec new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueueRunnable(1000),new ThreadPoolExecutor.CallerRunsPolicy()); 这种方法的问题在于当队列已满时向池提交任务的线程会变得忙于执行任务本身在此期间队列可能会变空并且池中的线程可能会变得空闲。 这不是很有效。 我们希望一直保持线程池繁忙并且工作队列始终处于饱和状态。 有多种解决方案。 其中之一是使用自定义的Executor 该Executor在队列已满时会阻塞从而阻止其他任务提交到池。 BlockingExecutor的代码如下所示。 它基于Brian Goetz2006年的BoundedExecutor示例。Java Concurrency in Practice。 1版。 Addison-Wesley专业。 第8.3.3节 。 import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** An executor which blocks and prevents further tasks from* being submitted to the pool when the queue is full.* p* Based on the BoundedExecutor example in:* Brian Goetz, 2006. Java Concurrency in Practice. (Listing 8.4)*/
public class BlockingExecutor extends ThreadPoolExecutor {private static final Logger LOGGER LoggerFactory.getLogger(BlockingExecutor.class);private final Semaphore semaphore;/*** Creates a BlockingExecutor which will block and prevent further* submission to the pool when the specified queue size has been reached.** param poolSize the number of the threads in the pool* param queueSize the size of the queue*/public BlockingExecutor(final int poolSize, final int queueSize) {super(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueueRunnable());// the semaphore is bounding both the number of tasks currently executing// and those queued upsemaphore new Semaphore(poolSize queueSize);}/*** Executes the given task.* This method will block when the semaphore has no permits* i.e. when the queue has reached its capacity.*/Overridepublic void execute(final Runnable task) {boolean acquired false;do {try {semaphore.acquire();acquired true;} catch (final InterruptedException e) {LOGGER.warn(InterruptedException whilst aquiring semaphore, e);}} while (!acquired);try {super.execute(task);} catch (final RejectedExecutionException e) {semaphore.release();throw e;}}/*** Method invoked upon completion of execution of the given Runnable,* by the thread that executed the task.* Releases a semaphore permit.*/Overrideprotected void afterExecute(final Runnable r, final Throwable t) {super.afterExecute(r, t);semaphore.release();}
} 参考我们的JCG合作伙伴 Fahd Shariff在fahd.blog博客上使用BlockingExecutor进行节流任务提交 。 翻译自: https://www.javacodegeeks.com/2013/11/throttling-task-submission-with-a-blockingexecutor-2.html