青岛seo网站排名,wordpress5.0.4不支持,企业网站建设工作室,中国专业摄影网多线程写作类
倒计时协调器CountDownLatch
某个线程需要等待其他线程执行到特定操作结束即可。例如#xff1a;在多Web服务中#xff0c;在启动指定服务时需要启动若干启动过程中比较耗时的服务#xff0c;为了尽可能减少服务启动过程的总耗时#xff0c;该服务会使用专门…多线程写作类
倒计时协调器CountDownLatch
某个线程需要等待其他线程执行到特定操作结束即可。例如在多Web服务中在启动指定服务时需要启动若干启动过程中比较耗时的服务为了尽可能减少服务启动过程的总耗时该服务会使用专门的工程线程以并发的方式去启动这些服务。但是在这些服务启动完成后需要对这些启动的服务进行检查之后都启动完成后才可以启动指定服务。
注意点
确保所有CountDownLatch.()调用都位于代码中正确的位置。最好是放在finally中避免因线程出现异常导致导致该线程一直处于WAITING状态。等待线程在等待先决操作完成的时间指定一个时间限制CountDownLatch.await(long, TimeUnit)。在超过指定时间后在该时间CountDownLatch的计数器仍未到达0那么所有执行该实例的await方法的线程都会被唤醒。
栅栏CyclicBarrier
多个线程都到了某一点之后在一起运行。例如在web服务中我们需要保证多个关联服务都启动完成后确保在进行服务间的可以正常通信。
阻塞队列
ArrayBlockingQueue 适合在生产者线程和消费者线程之间的并发程度较大的情况下使用
LinkedBlockingQueue 适合在生产者线程和消费者线程之间的并发成度较低的情况下使用SynchronousQueue 适合在消费者处理能力与生产者处理能力差不多的情况下使用
限购
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Semaphore;public class SemaphoreBasedChanneE implements ChannelE {private final BlockingQueueE queue;private final Semaphore semaphore;/***** param queue* 阻塞队列* param flowLimit* 流量县限制数*/public SemaphoreBasedChanne(BlockingQueueE queue, int flowLimit) {this(queue, false, flowLimit);}public SemaphoreBasedChanne(BlockingQueueE queue, boolean isFair, int flowLimit) {this.queue queue;this.semaphore new Semaphore(flowLimit, isFair); // 是否是公平锁}/*** 注意在代码中acquire与release总是配对出现的*/Overridepublic void put(E product) throws InterruptedException {semaphore.acquire();try {queue.put(product);} finally {// 最好放在finally中以免资源访问出现异常的情况下当前线程所获得配额无法返还semaphore.release();}}Overridepublic E take() throws InterruptedException {return queue.take();}
}/*** 对传输通道的抽象* * author Viscent Huang*/
public interface ChannelP {/*** 往传输通道中存入一个产品* * param product* 产品*/void put(P product) throws InterruptedException;/*** 从传输通道中取出一个产品* * return 产品*/P take() throws InterruptedException;
}管道线程间的直接输入与输出
PipedOutputStream和PipedInputStream是生产者-消费者模式的一个具体例子。可以看作一个线程的输出可作为另外一个线程的输入而不必借用文件、数据库、网络连接等其他数据交换中介。
双缓冲与Exchanger
线程中断
并发集合
快照是在Iterator实例呗创建的那一刻待遍历对象内部结构的一个只读副本它反映了待遍历集合的某一个时刻即Iterator实例呗创建的那一刻的状态。不同线程在读取数据时会获取到各自的快照因此这些快照相当于这些线程的线程的线程特有对象。所以这种方式下进行遍历操作的线程无须加锁就可以实现线程安全。另外由于快照是只读的因此这种遍历方式锁返回的Iterator实例是不支持remove方法的。优点 遍历操作和更新操作之间互不影响缺点当遍历的集合较大时创建快照的直接或者渐渐开销会比较大。准实时是指遍历操作不是针对待遍历对象的副本进行的但又不借助锁来保障线程安全从而使得遍历操作可以与更新操作并发进行。
几种线程安全集合类
非线层安全对象并发集合类共同接口遍历实现方式ArrayListCopyOnWriteArrayListList快照HashSetCopyOnWriteArraySetSet快照LinkedListConcurrentLinkedQueueQueue准实时HashMapConcurrentHashMapMap准实时TreeMapConcurrentSkipListMapSortedMap准实时TreeSetConcurrentSortedSet准实时
ConcurrentLinkedQueue是Queue接口的一个线程安全实现类他相当于LinkedList的线程安全版可以作为Collections.synchronizedListnew ArryaList()的代替品。其内部访问共享状态变量如队首和队尾指针的时候并不借助锁而是使用CAS操作来保障线程安全。因此ConcurrentLinkedQueue是非阻塞的其使用不会导致当前线程被暂停因此也就避免了上下文切换的开销。其更适合用于更新操作和遍历操作并发的场景例如生产者-消费者模式中生产者线程王队列中添加元素产品而消费者线程从队列中移除消费元素。ConcurrentHashMap是Map接口的一个线程安全实现类它相当于HashMap的线程安全版可以作为HashTable和Collections.synchronizedMap(new HashMap())的替代品。ConcurrentHashMap内部使用了颗粒度极小的锁来保障其线程安全。ConcurrentHashMap的读取操作基本上不会导致锁的使用。另外默认ConcurrentHashMap可以支持16个并发更新线程(并发数量可以通过concurrencyLevel来调节值越大开销越大越小可能导致并发更新产生锁的争用)即这些线程可以在不导致锁的争用情况下进行并发更新。因此ConcurrentHashMap可以支持比较高的并发性并且其锁的开销一般比较小。与HashTable的区别在于锁的力度不同HashTable在大多数方法上加锁导致而ConcurrentHashMap是进行分段加锁的。CopyOnWriteArrayList是List接口的一个线程安全实现类它相当于ArrayList的线程安全版本。其内部会维护一个实例变量用array用于引用一个数组。CopyOnWriteArrayList的更新操作是通过创建一个新的数组newArray并把老的数组的内容不知道newArray然后对newArray进行更新并将array引用指向newArray因此CopyOnWriteArrayList适用遍历操作元比更新操作频繁或者不希望在遍历的时候加锁的场景。而其他场景下我们仍然要考虑适用Collections.synchronizedList(new ArrayList())。CopyOnWriteArraySet是Set接口的一个线程安全实现类它相当于HashSet的线程安全版其内部使用一个CopyOnWriteArrayList实例因此CopyOnWriteArraySet的使用场景与CopyOnWriteArrayList相似。
死锁鹬蚌相争 死锁是一种在线程中常见活性故障如果两个或者更多的线程因相互等待而被永远暂停。
死锁的检测
使用jdk自带的Jconsole查看死锁的线程以及代码的位置信息。
线程管理
线程的未捕获异常与监控
当线程的run方法抛出未被捕获的异常run方法会退出相应的线程也提前终止。对于线程的这种异常终止做一些监控以及补偿。
import com.dengsheng.thread.util.Debug;
import com.dengsheng.thread.util.Tools;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;public class ThreadMonitorDemo {volatile boolean inited false;static int threadIndex 0;final static Logger LOGGER Logger.getAnonymousLogger();final BlockingQueueString channel new ArrayBlockingQueueString(100);public static void main(String[] args) throws InterruptedException {ThreadMonitorDemo demo new ThreadMonitorDemo();demo.init();for (int i 0; i 100; i) {demo.service(test- i);}Thread.sleep(2000);System.exit(0);}public synchronized void init() {if (inited) {return;}Debug.info(init...);WokrerThread t new WokrerThread();t.setName(Worker0- threadIndex);// 为线程t关联一个UncaughtExceptionHandlert.setUncaughtExceptionHandler(new ThreadMonitor());t.start();inited true;}public void service(String message) throws InterruptedException {channel.put(message);}private class ThreadMonitor implements Thread.UncaughtExceptionHandler {Overridepublic void uncaughtException(Thread t, Throwable e) {Debug.info(Current thread is t:%s, it is still alive:%s,Thread.currentThread() t, t.isAlive());// 将线程异常终止的相关信息记录到日志中String threadInfo t.getName();LOGGER.log(Level.SEVERE, threadInfo terminated:, e);// 创建并启动替代线程LOGGER.info(About to restart threadInfo);// 重置线程启动标记inited false;init();}}// 类ThreadMonitor定义结束private class WokrerThread extends Thread {Overridepublic void run() {Debug.info(Do something important...);String msg;try {for (;;) {msg channel.take();process(msg);}} catch (InterruptedException e) {// 什么也不做}}private void process(String message) {Debug.info(message);// 模拟随机性异常int i (int) (Math.random() * 100);if (i 2) {throw new RuntimeException(test);}Tools.randomPause(100);}}// 类ThreadMonitorDemo定义结束
}异步编程
从任务角度来看任务分为两种
同步Synchronous如同接力赛跑一个任务好比一个田径运动员比赛时一个运动从开始跑到下个队员位置下一个队员接着跑。异步Asychronous如同接力赛跑一个任务好比一个队伍比赛时队伍只负责跑至于结果需要等组委会给最后的结果。
Java中 假设我们用一个Runable实例task来表示一个任务我们直接调用task.run()来执行该任务此时这个任务就是同步任务如果使用new Thread(task).start()调用一个专门的工作者线程来执行该任务或者将该任务提交给Executor实例executor执行此时该任务为异步任务。
同步与异步是相对的它取决于任务的执行方式以及我们的观察角度。
ExecutorService的实现例
方法使用条件以及注意实现newCachedThreadPool适用于执行大量耗时较短且提交比较频繁的任务newFixedThreadPool此线程池核心线程数等于最大线程数所以线程不会超时关闭在使用该线程池的时候需要手动关闭该线程池释放资源newSingleThreadExecutor适用单多生产者-单消费者模式newScheduledThreadPool周期性线程池周期性执行队列中的任务多个线程newSingleThreadScheduledExecutor周期性线程池周期性执行队列中的任务单个线程
前三种类的是由 ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,workQueue)演化而来的。 newCachedThreadPool :: new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueueRunnable()); 0最大核心数为0即SynchronousQueue中的第一队列任务会被执行而其他的需要等前面的执行完成后才执行。 Integer.MAX_VALUE线程池中最大的线程数当现在执行的线程数没有超过且没有空闲线程的时候线程池会立即启动一个新的线程执行任务。极端情况下在同时执行的任务数量小于Integer.MAX_VALUE时这些提交的任务都会启动一个新的线程此时就会造成线程的频繁切换拖累整个系统。同时还可能会造成OOM线程数过多需要的的内存量过大。 60L线程池中的线程等待时间超过60s后会自动销毁。 newFixedThreadPool::new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable())); 0L: 线程不会超时即线程不会被因超时而销毁。 nThreads: 核心线程数与最大线程数相同即线程池中的线程不会被销毁。当我们不再使用该线程池时应该手动关闭线程池。尽管FinalizableDelegatedExecutorService在jvm回收的时候会关闭该线程池但是还是及时关闭该线程池。 static class FinalizableDelegatedExecutorServiceextends DelegatedExecutorService {FinalizableDelegatedExecutorService(ExecutorService executor) {super(executor);}protected void finalize() {super.shutdown();}
}newSingleThreadExecutor::new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable())) 0L: 线程不会超时即线程不会被因超时而销毁。 1: 最大核心数和最大线程数都只有一个所以只能有一个线程处理任务。例如往某个文件里面写多组数据每组数据获取时间不同先处理完的先写之后数据放到线程池里处理当然在满足效率的情况下既可以节省内存还可以避免一些没有必须要的问题例如死锁。 newScheduledThreadPool::new ScheduledThreadPoolExecutor(corePoolSize)和newSingleThreadScheduledExecutor::new ScheduledThreadPoolExecutor(1) newScheduledThreadPool 和 newSingleThreadScheduledExecutor都是周期性线程池。区别在于核心线程可能不同 schedule与scheduleWithFixedRate不太一样schedule当任务执行的时间超过周期的时间会导致后面的任务都会产生时间偏移
异步任务的批量执行CompletionService
没有见过暂时不学
java内存模型
高速缓存一致性
synchronized