网站风格主要包括,最新新闻消息事件,朗润装饰成都装修公司官网,网站赚钱一、前言 上周工作遇到了一个需求#xff0c;同步多个省份销号数据#xff0c;解绑微信粉丝。分省定时将销号数据放到SFTP服务器上#xff0c;我需要开发定时任务去解析文件。因为是多省份#xff0c;服务器、文件名规则、数据规则都不一定#xff0c;所以要做成可配置是有… 一、前言 上周工作遇到了一个需求同步多个省份销号数据解绑微信粉丝。分省定时将销号数据放到SFTP服务器上我需要开发定时任务去解析文件。因为是多省份服务器、文件名规则、数据规则都不一定所以要做成可配置是有一定难度的。数据规则这块必须强烈要求统一服务器、文件名规则都可以从配置中心去读。每新增一个省份的配置后台感知到后动态生成定时任务。 二、Springboot引入定时任务核心配置 Target(ElementType.TYPE)
Retention(RetentionPolicy.RUNTIME)
Import(SchedulingConfiguration.class)
Documented
public interface EnableScheduling {}Configuration
Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {Bean(name TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)Role(BeanDefinition.ROLE_INFRASTRUCTURE)public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {return new ScheduledAnnotationBeanPostProcessor();}} 接下来主要看一下这个核心后置处理器ScheduledAnnotationBeanPostProcessor 。 Override
public Object postProcessAfterInitialization(Object bean, String beanName) {if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||bean instanceof ScheduledExecutorService) {// Ignore AOP infrastructure such as scoped proxies.return bean;}Class? targetClass AopProxyUtils.ultimateTargetClass(bean); if (!this.nonAnnotatedClasses.contains(targetClass)) { MapMethod, SetScheduled annotatedMethods MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookupSetScheduled) method - { SetScheduled scheduledMethods AnnotatedElementUtils.getMergedRepeatableAnnotations( method, Scheduled.class, Schedules.class); return (!scheduledMethods.isEmpty() ? scheduledMethods : null); }); if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (logger.isTraceEnabled()) { logger.trace(No Scheduled annotations found on bean class: targetClass); } } else { // Non-empty set of methods annotatedMethods.forEach((method, scheduledMethods) - scheduledMethods.forEach(scheduled - processScheduled(scheduled, method, bean))); if (logger.isTraceEnabled()) { logger.trace(annotatedMethods.size() Scheduled methods processed on bean beanName : annotatedMethods); } } } return bean; } 1、处理Scheduled注解通过ScheduledTaskRegistrar注册定时任务。 private void finishRegistration() {if (this.scheduler ! null) {this.registrar.setScheduler(this.scheduler);}if (this.beanFactory instanceof ListableBeanFactory) {MapString, SchedulingConfigurer beans ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);ListSchedulingConfigurer configurers new ArrayList(beans.values());AnnotationAwareOrderComparator.sort(configurers);for (SchedulingConfigurer configurer : configurers) {configurer.configureTasks(this.registrar);}}if (this.registrar.hasTasks() this.registrar.getScheduler() null) {Assert.state(this.beanFactory ! null, BeanFactory must be set to find scheduler by type);try {// Search for TaskScheduler bean...this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));}catch (NoUniqueBeanDefinitionException ex) {logger.trace(Could not find unique TaskScheduler bean, ex);try {this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));}catch (NoSuchBeanDefinitionException ex2) {if (logger.isInfoEnabled()) {logger.info(More than one TaskScheduler bean exists within the context, and none is named taskScheduler. Mark one of them as primary or name it taskScheduler (possibly as an alias); or implement the SchedulingConfigurer interface and call ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: ex.getBeanNamesFound());}}}catch (NoSuchBeanDefinitionException ex) {logger.trace(Could not find default TaskScheduler bean, ex);// Search for ScheduledExecutorService bean next...try {this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));}catch (NoUniqueBeanDefinitionException ex2) {logger.trace(Could not find unique ScheduledExecutorService bean, ex2);try {this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));}catch (NoSuchBeanDefinitionException ex3) {if (logger.isInfoEnabled()) {logger.info(More than one ScheduledExecutorService bean exists within the context, and none is named taskScheduler. Mark one of them as primary or name it taskScheduler (possibly as an alias); or implement the SchedulingConfigurer interface and call ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: ex2.getBeanNamesFound());}}}catch (NoSuchBeanDefinitionException ex2) {logger.trace(Could not find default ScheduledExecutorService bean, ex2);// Giving up - falling back to default scheduler within the registrar...logger.info(No TaskScheduler/ScheduledExecutorService bean found for scheduled processing);}}}this.registrar.afterPropertiesSet();
} 1、通过一系列的SchedulingConfigurer动态配置ScheduledTaskRegistrar。 2、向ScheduledTaskRegistrar注册一个TaskScheduler用于对Runnable的任务进行调度它包含有多种触发规则。 3、registrar.afterPropertiesSet()在这开始安排所有的定时任务开始执行了。 protected void scheduleTasks() {if (this.taskScheduler null) {this.localExecutor Executors.newSingleThreadScheduledExecutor();this.taskScheduler new ConcurrentTaskScheduler(this.localExecutor);}if (this.triggerTasks ! null) {for (TriggerTask task : this.triggerTasks) {addScheduledTask(scheduleTriggerTask(task));}}if (this.cronTasks ! null) {for (CronTask task : this.cronTasks) {addScheduledTask(scheduleCronTask(task));}}if (this.fixedRateTasks ! null) {for (IntervalTask task : this.fixedRateTasks) {addScheduledTask(scheduleFixedRateTask(task));}}if (this.fixedDelayTasks ! null) {for (IntervalTask task : this.fixedDelayTasks) {addScheduledTask(scheduleFixedDelayTask(task));}}
} 1、TriggerTask动态定时任务。通过Trigger#nextExecutionTime 给定的触发上下文确定下一个执行时间。 2、CronTask动态定时任务TriggerTask子类。通过cron表达式确定的时间触发下一个任务执行。 3、IntervalTask一定时间延迟之后周期性执行的任务。 4、taskScheduler 如果为空默认是ConcurrentTaskScheduler并使用默认单线程的ScheduledExecutor。 三、主要看一下CronTask工作原理 ScheduledTaskRegistrar.java
Nullable
public ScheduledTask scheduleCronTask(CronTask task) {ScheduledTask scheduledTask this.unresolvedTasks.remove(task);boolean newTask false;if (scheduledTask null) {scheduledTask new ScheduledTask(task);newTask true;}if (this.taskScheduler ! null) {scheduledTask.future this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());}else {addCronTask(task);this.unresolvedTasks.put(task, scheduledTask);}return (newTask ? scheduledTask : null);
}ConcurrentTaskScheduler.java
Override
Nullable
public ScheduledFuture? schedule(Runnable task, Trigger trigger) {try {if (this.enterpriseConcurrentScheduler) {return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);}else {ErrorHandler errorHandler (this.errorHandler ! null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule();}}catch (RejectedExecutionException ex) {throw new TaskRejectedException(Executor [ this.scheduledExecutor ] did not accept task: task, ex);}
}ReschedulingRunnable.java
Nullable
public ScheduledFuture? schedule() {synchronized (this.triggerContextMonitor) {this.scheduledExecutionTime this.trigger.nextExecutionTime(this.triggerContext);if (this.scheduledExecutionTime null) {return null;}long initialDelay this.scheduledExecutionTime.getTime() - System.currentTimeMillis();this.currentFuture this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);return this;}
}private ScheduledFuture? obtainCurrentFuture() {Assert.state(this.currentFuture ! null, No scheduled future);return this.currentFuture;
}Override
public void run() {Date actualExecutionTime new Date();super.run();Date completionTime new Date();synchronized (this.triggerContextMonitor) {Assert.state(this.scheduledExecutionTime ! null, No scheduled execution);this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);if (!obtainCurrentFuture().isCancelled()) {schedule();}}
} 1、最终将task和trigger都封装到了ReschedulingRunnable中。 2、ReschedulingRunnable实现了任务重复调度schedule方法中调用调度器executor并传入自身对象executor会调用run方法run方法又调用了schedule方法。 3、ReschedulingRunnable schedule方法加了同步锁只能有一个线程拿到下次执行时间并加入执行器的调度。 4、不同的ReschedulingRunnable对象之间在线程池够用的情况下是不会相互影响的也就是说满足线程池的条件下TaskScheduler的schedule方法的多次调用是可以交叉执行的。 ScheduledThreadPoolExecutor.java
public ScheduledFuture? schedule(Runnable command,long delay,TimeUnit unit) {if (command null || unit null)throw new NullPointerException();RunnableScheduledFuture? t decorateTask(command,new ScheduledFutureTaskVoid(command, null,triggerTime(delay, unit)));delayedExecute(t);return t;
}private void delayedExecute(RunnableScheduledFuture? task) {if (isShutdown())reject(task);else {super.getQueue().add(task);if (isShutdown() !canRunInCurrentRunState(task.isPeriodic()) remove(task))task.cancel(false);elseensurePrestart();}
} ScheduledFutureTask 工作原理如下图所示【太懒了不想画图了盗图一张】。 1、ScheduledFutureTask会放入优先阻塞队列ScheduledThreadPoolExecutor.DelayedWorkQueue二叉最小堆实现 2、上图中的Thread对象即ThreadPoolExecutor.Worker实现了Runnable接口 /*** Creates with given first task and thread from ThreadFactory.* param firstTask the first task (null if none)*/
Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask firstTask;this.thread getThreadFactory().newThread(this);
}/** Delegates main run loop to outer runWorker */
public void run() {runWorker(this);
} 1、Worker中维护了Thread对象Thread对象的Runnable实例即Worker自身 2、ThreadPoolExecutor#addWorker方法中会创建Worker对象然后拿到Worker中的thread实例并start这样就创建了线程池中的一个线程实例 3、Worker的run方法会调用ThreadPoolExecutor#runWorker方法这才是任务最终被执行的地方该方法示意如下 1首先取传入的task执行如果task是null只要该线程池处于运行状态就会通过getTask方法从workQueue中取任务。ThreadPoolExecutor的execute方法会在无法产生core线程的时候向 workQueue队列中offer任务。getTask方法从队列中取task的时候会根据相关配置决定是否阻塞和阻塞多久。如果getTask方法结束返回的是nullrunWorker循环结束执行processWorkerExit方法。至此该线程结束自己的使命从线程池中“消失”。 2在开始执行任务之前会调用Worker的lock方法目的是阻止task正在被执行的时候被interrupt通过调用clearInterruptsForTaskRun方法来保证的(后面可以看一下这个方法)该线程没有自己的interrupt set了。 3beforeExecute和afterExecute方法用于在执行任务前后执行一些自定义的操作这两个方法是空的留给继承类去填充功能。我们可以在beforeExecute方法中抛出异常这样task不会被执行而且在跳出该循环的时候completedAbruptly的值是true表示the worker died due to user exception会用decrementWorkerCount调整wc。 4因为Runnable的run方法不能抛出Throwables异常所以这里重新包装异常然后抛出抛出的异常会使当当前线程死掉可以在afterExecute中对异常做一些处理。 5afterExecute方法也可能抛出异常也可能使当前线程死掉。 四、动态创建定时任务 TaskConfiguration 配置类 Configuration
EnableScheduling
Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class TaskConfiguration {Bean(name ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME)Role(BeanDefinition.ROLE_INFRASTRUCTURE)public ScheduledExecutorService scheduledAnnotationProcessor() {return Executors.newScheduledThreadPool(5, new DefaultThreadFactory());}private static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber new AtomicInteger(1);private final String namePrefix;DefaultThreadFactory() {SecurityManager s System.getSecurityManager();group (s ! null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix pool- poolNumber.getAndIncrement() -schedule-;}Overridepublic Thread newThread(Runnable r) {Thread t new Thread(group, r,namePrefix threadNumber.getAndIncrement(),0);if (t.isDaemon()) {t.setDaemon(false);}if (t.getPriority() ! Thread.NORM_PRIORITY) {t.setPriority(Thread.NORM_PRIORITY);}return t;}}
} 1、保证ConcurrentTaskScheduler不使用默认单线程的ScheduledExecutor而是corePoolSize5的线程池 2、自定义线程池工厂类 DynamicTask 动态定时任务 Configuration
public class DynamicTask implements SchedulingConfigurer {private static Logger LOGGER LoggerFactory.getLogger(DynamicTask.class);private static final ExecutorService es new ThreadPoolExecutor(10, 20,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue(10),new DynamicTaskConsumeThreadFactory());private volatile ScheduledTaskRegistrar registrar;private final ConcurrentHashMapString, ScheduledFuture? scheduledFutures new ConcurrentHashMap();private final ConcurrentHashMapString, CronTask cronTasks new ConcurrentHashMap();private volatile ListTaskConstant taskConstants Lists.newArrayList();Overridepublic void configureTasks(ScheduledTaskRegistrar registrar) {this.registrar registrar;this.registrar.addTriggerTask(() - {if (!CollectionUtils.isEmpty(taskConstants)) {LOGGER.info(检测动态定时任务列表...);ListTimingTask tts new ArrayList();taskConstants.forEach(taskConstant - {TimingTask tt new TimingTask();tt.setExpression(taskConstant.getCron());tt.setTaskId(dynamic-task- taskConstant.getTaskId());tts.add(tt);});this.refreshTasks(tts);}}, triggerContext - new PeriodicTrigger(5L, TimeUnit.SECONDS).nextExecutionTime(triggerContext));}public ListTaskConstant getTaskConstants() {return taskConstants;}private void refreshTasks(ListTimingTask tasks) {//取消已经删除的策略任务SetString taskIds scheduledFutures.keySet();for (String taskId : taskIds) {if (!exists(tasks, taskId)) {scheduledFutures.get(taskId).cancel(false);}}for (TimingTask tt : tasks) {String expression tt.getExpression();if (StringUtils.isBlank(expression) || !CronSequenceGenerator.isValidExpression(expression)) {LOGGER.error(定时任务DynamicTask cron表达式不合法: expression);continue;}//如果配置一致则不需要重新创建定时任务if (scheduledFutures.containsKey(tt.getTaskId()) cronTasks.get(tt.getTaskId()).getExpression().equals(expression)) {continue;}//如果策略执行时间发生了变化则取消当前策略的任务if (scheduledFutures.containsKey(tt.getTaskId())) {scheduledFutures.remove(tt.getTaskId()).cancel(false);cronTasks.remove(tt.getTaskId());}CronTask task new CronTask(tt, expression);ScheduledFuture? future registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());cronTasks.put(tt.getTaskId(), task);scheduledFutures.put(tt.getTaskId(), future);}}private boolean exists(ListTimingTask tasks, String taskId) {for (TimingTask task : tasks) {if (task.getTaskId().equals(taskId)) {return true;}}return false;}PreDestroypublic void destroy() {this.registrar.destroy();}public static class TaskConstant {private String cron;private String taskId;public String getCron() {return cron;}public void setCron(String cron) {this.cron cron;}public String getTaskId() {return taskId;}public void setTaskId(String taskId) {this.taskId taskId;}}private class TimingTask implements Runnable {private String expression;private String taskId;public String getTaskId() {return taskId;}public void setTaskId(String taskId) {this.taskId taskId;}Overridepublic void run() {//设置队列大小10LOGGER.error(当前CronTask: this);DynamicBlockingQueue queue new DynamicBlockingQueue(3);es.submit(() - {while (!queue.isDone() || !queue.isEmpty()) {try {String content queue.poll(500, TimeUnit.MILLISECONDS);if (StringUtils.isBlank(content)) {return;}LOGGER.info(DynamicBlockingQueue 消费 content);TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}});//队列放入数据for (int i 0; i 5; i) {try {queue.put(String.valueOf(i));LOGGER.info(DynamicBlockingQueue 生产 i);} catch (InterruptedException e) {e.printStackTrace();}}queue.setDone(true);}public String getExpression() {return expression;}public void setExpression(String expression) {this.expression expression;}Overridepublic String toString() {return ReflectionToStringBuilder.toString(this, ToStringStyle.JSON_STYLE, false, false, TimingTask.class);}}/*** 队列消费线程工厂类*/private static class DynamicTaskConsumeThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber new AtomicInteger(1);private final String namePrefix;DynamicTaskConsumeThreadFactory() {SecurityManager s System.getSecurityManager();group (s ! null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix pool- poolNumber.getAndIncrement() -dynamic-task-;}Overridepublic Thread newThread(Runnable r) {Thread t new Thread(group, r,namePrefix threadNumber.getAndIncrement(),0);if (t.isDaemon()) {t.setDaemon(false);}if (t.getPriority() ! Thread.NORM_PRIORITY) {t.setPriority(Thread.NORM_PRIORITY);}return t;}}private static class DynamicBlockingQueue extends LinkedBlockingQueueString {DynamicBlockingQueue(int capacity) {super(capacity);}private volatile boolean done false;public boolean isDone() {return done;}public void setDone(boolean done) {this.done done;}}
} 1、taskConstants 动态任务列表 2、ScheduledTaskRegistrar#addTriggerTask 添加动态周期定时任务检测动态任务列表的变化 CronTask task new CronTask(tt, expression);
ScheduledFuture? future registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());
cronTasks.put(tt.getTaskId(), task);
scheduledFutures.put(tt.getTaskId(), future);3、动态创建cron定时任务拿到ScheduledFuture实例并缓存起来 4、在刷新任务列表时通过缓存的ScheduledFuture实例和CronTask实例来决定是否取消、移除失效的动态定时任务。 DynamicTaskTest 动态定时任务测试类 RunWith(SpringRunner.class)
SpringBootTest
public class DynamicTaskTest {Autowiredprivate DynamicTask dynamicTask;Testpublic void test() throws InterruptedException {ListDynamicTask.TaskConstant taskConstans dynamicTask.getTaskConstants();DynamicTask.TaskConstant taskConstant new DynamicTask.TaskConstant();taskConstant.setCron(0/5 * * * * ?);taskConstant.setTaskId(test1);taskConstans.add(taskConstant);DynamicTask.TaskConstant taskConstant1 new DynamicTask.TaskConstant();taskConstant1.setCron(0/5 * * * * ?);taskConstant1.setTaskId(test2);taskConstans.add(taskConstant1);DynamicTask.TaskConstant taskConstant2 new DynamicTask.TaskConstant();taskConstant2.setCron(0/5 * * * * ?);taskConstant2.setTaskId(test3);taskConstans.add(taskConstant2);TimeUnit.SECONDS.sleep(40);//移除并添加新的配置taskConstans.remove(taskConstans.size() - 1);DynamicTask.TaskConstant taskConstant3 new DynamicTask.TaskConstant();taskConstant3.setCron(0/5 * * * * ?);taskConstant3.setTaskId(test4);taskConstans.add(taskConstant3);
//TimeUnit.MINUTES.sleep(50);}
} 转载于:https://www.cnblogs.com/hujunzheng/p/10353390.html