泗洪县城乡建设局网站,专业定制网站需要什么技能,网站建设完工确认书,内江做网站哪里便宜4.1 ScheduleThreadPoolExecutor介绍
从名字上就可以看出#xff0c;当前线程池是用于执行定时任务的线程池。 Java比较早的定时任务工具是Timer类。但是Timer问题很多#xff0c;串行的#xff0c;不靠谱#xff0c;会影响到其他的任务执行。 其实除了Timer以及ScheduleT…4.1 ScheduleThreadPoolExecutor介绍
从名字上就可以看出当前线程池是用于执行定时任务的线程池。 Java比较早的定时任务工具是Timer类。但是Timer问题很多串行的不靠谱会影响到其他的任务执行。 其实除了Timer以及ScheduleThreadPoolExecutor之外正常在企业中一般会采用Quartz或者是SpringBoot提供的Schedule的方式去实现定时任务的功能。 ScheduleThreadPoolExecutor支持延迟执行以及周期性执行的功能。
4.2 ScheduleThreadPoolExecutor应用
定时任务线程池的有参构造
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
发现ScheduleThreadPoolExecutor在构建时直接调用了父类的构造方法 ScheduleThreadPoolExecutor的父类就是ThreadPoolExecutor首先ScheduleThreadPoolExecutor最多允许设置3个参数 ● 核心线程数 ● 线程工厂 ● 拒绝策略 首先没有设置阻塞队列以及最大线程数和空闲时间以及单位 阻塞队列设置的是DelayedWorkQueue其实本质就是DelayQueue一个延迟队列。DelayQueue是一个无界队列。所以最大线 程数以及非核心线程的空闲时间是不需要设置的。 代码落地使用
public static void main(String[] args) {
//1. 构建定时任务线程池
ScheduledThreadPoolExecutor pool new ScheduledThreadPoolExecutor(
5,
new ThreadFactory() {
Override
public Thread newThread(Runnable r) {
Thread t new Thread(r);
return t;
}
},
new ThreadPoolExecutor.AbortPolicy());
//2. 应用ScheduledThreadPoolExecutor
// 跟直接执行线程池的execute没啥区别
pool.execute(() - {
System.out.println(execute);
});
// 指定延迟时间执行
System.out.println(System.currentTimeMillis());
pool.schedule(() - {
System.out.println(schedule);
System.out.println(System.currentTimeMillis());
},2, TimeUnit.SECONDS);
// 指定第一次的延迟时间并且确认后期的周期执行时间周期时间是在任务开始时就计算
// 周期性执行就是将执行完毕的任务再次社会好延迟时间并且重新扔到阻塞队列
// 计算的周期执行也是在原有的时间上做累加不关注任务的执行时长。
System.out.println(System.currentTimeMillis());
pool.scheduleAtFixedRate(() - {
System.out.println(scheduleAtFixedRate);
System.out.println(System.currentTimeMillis());
},2,3,TimeUnit.SECONDS);
// // 指定第一次的延迟时间并且确认后期的周期执行时间周期时间是在任务结束后再计算下次的延迟时间
System.out.println(System.currentTimeMillis());pool.scheduleWithFixedDelay(() - {
System.out.println(scheduleWithFixedDelay);
System.out.println(System.currentTimeMillis());
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
},2,3,TimeUnit.SECONDS);
}
4.3 ScheduleThreadPoolExecutor源码剖析
4.3.1 核心属性
后面的方法业务流程会涉及到这些属性。
// 这里是针对任务取消时的一些业务判断会用到的标记
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
private volatile boolean executeExistingDelayedTasksAfterShutdown true;
private volatile boolean removeOnCancel false;
// 计数器如果两个任务的执行时间节点一模一样根据这个序列来判断谁先执行
private static final AtomicLong sequencer new AtomicLong();
// 这个方法是获取当前系统时间的毫秒值
final long now() {
return System.nanoTime();
}
// 内部类。核心类之一。
private class ScheduledFutureTaskV
extends FutureTaskV implements RunnableScheduledFutureV {
// 全局唯一的序列如果两个任务时间一直基于当前属性判断
private final long sequenceNumber;
// 任务执行的时间单位纳秒
private long time;
/**
* period 0执行一次的延迟任务
* period 0代表是At
* period 0代表是With
*/
private final long period;
// 周期性执行时需要将任务重新扔回阻塞队列基础当前属性拿到任务方便扔回阻塞队列
RunnableScheduledFutureV outerTask this;
/**
* 构建schedule方法的任务*/
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time ns;
this.period 0;
this.sequenceNumber sequencer.getAndIncrement();
}
/**
* 构建At和With任务的有参构造
*/
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time ns;
this.period period;
this.sequenceNumber sequencer.getAndIncrement();
}
}
// 内部类。核心类之一。
static class DelayedWorkQueue extends AbstractQueueRunnable implements BlockingQueueRunnable {
// 这个类就是DelayQueue不用过分关注如果没看过看阻塞队列中的优先级队列和延迟队列
4.3.2 schedule方法
execute方法也是调用的schedule方法只不过传入的延迟时间是0纳秒 schedule方法就是将任务和延迟时间封装到一起并且将任务扔到阻塞队列中再去创建工作线程去take阻塞队列。
// 延迟任务执行的方法。
// command任务
// delay延迟时间
// unit延迟时间的单位
public ScheduledFuture? schedule(Runnable command, long delay, TimeUnit unit) {
// 健壮性校验。
if (command null || unit null)
throw new NullPointerException();
// 将任务和延迟时间封装到一起最终组成ScheduledFutureTask
// 要分成三个方法去看
// triggerTime计算延迟时间。最终返回的是当前系统时间 延迟时间
// triggerTime就是将延迟时间转换为纳秒并且当前系统时间再做一些健壮性校验
// ScheduledFutureTask有参构造将任务以及延迟时间封装到一起并且设置任务执行的方式
// decorateTask当前方式是让用户基于自身情况可以动态修改任务的一个扩展口
RunnableScheduledFuture? t decorateTask(command,
new ScheduledFutureTaskVoid(command, null,
triggerTime(delay, unit)));
// 任务封装好执行delayedExecute方法去执行任务
delayedExecute(t);
// 返回FutureTaskreturn t;
}
// triggerTime做的事情
// 外部方法对延迟时间做校验如果小于0就直接设置为0
// 并且转换为纳秒单位
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay 0) ? 0 : delay));
}
// 将延迟时间当前系统时间
// 后面的校验是为了避免延迟时间超过Long的取值范围
long triggerTime(long delay) {
return now() ((delay (Long.MAX_VALUE 1)) ? delay : overflowFree(delay));
}
// ScheduledFutureTask有参构造
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
// time就是任务要执行的时间
this.time ns;
// period,为0代表任务是延迟执行不是周期执行
this.period 0;
// 基于AtmoicLong生成的序列
this.sequenceNumber sequencer.getAndIncrement();
}
// delayedExecute 执行延迟任务的操作
private void delayedExecute(RunnableScheduledFuture? task) {
// 查看当前线程池是否还是RUNNING状态如果不是RUNNING进到if
if (isShutdown())
// 不是RUNNING。
// 执行拒绝策略。
reject(task);
else {
// 线程池状态是RUNNING
// 直接让任务扔到延迟的阻塞队列中
super.getQueue().add(task);
// DCL的操作再次查看线程池状态
// 如果线程池在添加任务到阻塞队列后状态不是RUNNING
if (isShutdown()
// task.isPeriodic()现在反回的是false因为任务是延迟执行不是周期执行
// 默认情况延迟队列中的延迟任务可以执行
!canRunInCurrentRunState(task.isPeriodic())
// 从阻塞队列中移除任务。
remove(task))
task.cancel(false);
else
// 线程池状态正常任务可以执行
ensurePrestart();
}
}
// 线程池状态不为RUNNING查看任务是否可以执行
// 延迟执行periodicfalse
// 周期执行periodictrue
// continueExistingPeriodicTasksAfterShutdown周期执行任务默认为false
// executeExistingDelayedTasksAfterShutdown延迟执行任务默认为true
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
// 当前情况shutdownOK为true
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs runStateOf(ctl.get());
// 如果状态是RUNNING正常可以执行返回true
// 如果状态是SHUTDOWN根据shutdownOK来决定
return rs RUNNING || (rs SHUTDOWN shutdownOK);
}
// 任务可以正常执行后做的操作
void ensurePrestart() {
// 拿到工作线程个数
int wc workerCountOf(ctl.get());
// 如果工作线程个数小于核心线程数
if (wc corePoolSize)
// 添加核心线程去处理阻塞队列中的任务
addWorker(null, true);
else if (wc 0)// 如果工作线程数为0核心线程数也为0这是添加一个非核心线程去处理阻塞队列任务
addWorker(null, false);
}
4.3.3 At和With方法任务的run方法
这两个方法在源码层面上的第一个区别就是在计算周期时间时需要将这个值传递给period基于正负数在区别At和With 所以查看一个方法就ok查看At方法
// At方法
// command任务
// initialDelay第一次执行的延迟时间
// period任务的周期执行时间
// unit上面两个时间的单位
public ScheduledFuture? scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
// 健壮性校验
if (command null || unit null)
throw new NullPointerException();
// 周期时间不能小于等于0.
if (period 0)
throw new IllegalArgumentException();
// 将任务以及第一次的延迟时间和后续的周期时间封装好。
ScheduledFutureTaskVoid sft new ScheduledFutureTaskVoid(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
// 扩展口可以对任务做修改。
RunnableScheduledFutureVoid t decorateTask(command, sft);
// 周期性任务需要在任务执行完毕后重新扔会到阻塞队列为了方便拿任务将任务设置到outerTask成员变量中
sft.outerTask t;
// 和schedule方法一样的方式
// 如果任务刚刚扔到阻塞队列线程池状态变为SHUTDOWN默认情况当前任务不执行
delayedExecute(t);
return t;
}
// 延迟任务以及周期任务在执行时都会调用当前任务的run方法。
public void run() {
// periodic false一次性延迟任务
// periodic true周期任务
boolean periodic isPeriodic();
// 任务执行前会再次判断状态能否执行任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 判断是周期执行还是一次性任务
else if (!periodic)
// 一次性任务让工作线程直接执行command的逻辑
ScheduledFutureTask.super.run();// 到这个else if说明任务是周期执行
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置下次任务执行的时间
setNextRunTime();
// 将任务重新扔回线程池做处理
reExecutePeriodic(outerTask);
}
}
// 设置下次任务执行的时间
private void setNextRunTime() {
// 拿到period值正数At负数With
long p period;
if (p 0)
// 拿着之前的执行时间直接追加上周期时间
time p;
else
// 如果走到else代表任务是With方式这种方式要重新计算延迟时间
// 拿到当前系统时间追加上延迟时间
time triggerTime(-p);
}
// 将任务重新扔回线程池做处理
void reExecutePeriodic(RunnableScheduledFuture? task) {
// 如果状态ok可以执行
if (canRunInCurrentRunState(true)) {
// 将任务扔到延迟队列
super.getQueue().add(task);
// DCL判断线程池状态if (!canRunInCurrentRunState(true) remove(task))
task.cancel(false);
else
// 添加工作线程
ensurePrestart();
}
}