当前位置: 首页 > news >正文

自己可以做英文网站么比较好的wordpress插件

自己可以做英文网站么,比较好的wordpress插件,人力资源网,黑龙江牡安建设有限公司网站分布式定时任务调度系统 流程分析 一个分布式定时任务#xff0c;需要具备有以下几点功能#xff1a; 核心功能#xff1a;定时调度、任务管理、可观测日志高可用#xff1a;集群、分片、失败处理高性能#xff1a;分布式锁扩展功能#xff1a;可视化运维、多语言、任…分布式定时任务调度系统 流程分析 一个分布式定时任务需要具备有以下几点功能 核心功能定时调度、任务管理、可观测日志高可用集群、分片、失败处理高性能分布式锁扩展功能可视化运维、多语言、任务编排 在分布式环境下一般会将定时任务拆解为任务调度部分和任务执行部分各司其职。 调度中心就是总览全局的Leader具体的执行器就是需要工作的worker。Leader分配任务worker负责任务执行。那么Leader会等worker执行完任务再去干其他事情吗显然不行这样效率太低了。 Leader时间到了你去执行任务 Worker收到我马上执行任务完成给你反馈不用一直等我。。。 Leader任务执行完了 Worker收到 Worker执行器挂了任务也标记失败吧。。。还得报告上级任务失败了。。 核心问题 任务如何触发触发失败的处理逻辑任务如何执行任务结果如何反馈反馈回调失败处理逻辑任务日志查看任务失败判断逻辑与依据任务失败后告警提示如何保证高可用集群如何搭建调度和执行之间的通信和心跳 同类产品对比 QuartZxxl-jobSchedulerX 2.0PowerJob定时类型CRONCRONCRON、固定频率、固定延迟、OpenAPICRON、固定频率、固定延迟、OpenAPI任务类型内置Java内置Java、GLUE Java、Shell、Python等脚本内置Java、外置JavaFatJar、Shell、Python等脚本内置Java、外置Java容器、Shell、Python等脚本分布式任务无静态分片MapReduce 动态分片MapReduce 动态分片在线任务治理不支持支持支持支持日志白屏化不支持支持不支持支持调度方式及性能基于数据库锁有性能瓶颈基于数据库锁有性能瓶颈不详无锁化设计性能强劲无上限报警监控无邮件短信邮件提供接口允许开发者扩展系统依赖关系型数据库MySQL、Oracle…MySQL人民币任意 Spring Data Jpa支持的关系型数据库MySQL、Oracle…DAG 工作流不支持不支持支持支持 数据来源于PowerJobhttps://www.yuque.com/powerjob/guidence/intro XXL-JOB相关概念 调度中心xxl-job-admin ;统一管理任务调度平台上调度任务负责触发调度执行并且提供任务管理平台。 执行器负责接收调度中心的调度并执行可直接部署执行器也可以将执行器集成到现有业务项目中。 XXL-JOB系统架构 逻辑架构 数据架构 xxl-job调度中心数据表 - xxl_job_lock任务调度锁表 - xxl_job_group执行器信息表维护任务执行器信息 - xxl_job_info调度扩展信息表 用于保存XXL-JOB调度任务的扩展信息如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等 - xxl_job_log调度日志表 用于保存XXL-JOB任务调度的历史信息如调度结果、执行结果、调度入参、调度机器和执行器等等 - xxl_job_log_report调度日志报表用户存储XXL-JOB任务调度日志的报表调度中心报表功能页面会用到 - xxl_job_logglue任务GLUE日志用于保存GLUE更新历史用于支持GLUE的版本回溯功能 - xxl_job_registry执行器注册表维护在线的执行器和调度中心机器地址信息 - xxl_job_user系统用户表核心表E-R图 执行流程 整体执行流程 执行器自动注册到调度中心30秒执行一次用于心跳检查。执行器销毁会取消注册。调度中心根据触发时间触发调度任务。执行器通过任务执行线程池执行任务并记录执行日志执行结果异步上报。调度中心日志请求。 执行流程细化 执行器注册 服务启动后ExecutorRegistryThread通过registryThread注册线程每30s向调度中心注册一次。服务销毁后ExecutorRegistryThread调用一次取消注册接口从调度中心删除当前节点的注册信息。JobRegistryHelper内部维护了一个registryOrRemoveThreadPool注册或者删除线程池用于处理执行器客户端发送的注册或者删除请求同时更新调度中心的执行器地址信息。JobRegistryHelper内部为了一个registryMonitorThread注册监听线程每30s执行一次与客户端注册频率一致用于监听超过90s未主动注册的节点地址。超过90s就认为节点掉线。 调度中心任务调度 JobScheduleHelper主要负责任务调度逻辑判断与执行调度。内部维护了来个线程来执行任务调度。scheduleThread调度线程主要负责扫描任务将能够执行的任务放入时间轮并计算下一次执行时间。ringThread时间轮线程主要处理时间轮中的任务调用JobTriggerPoolHelper进行任务触发。JobTriggerPoolHelper任务触发线程由快慢线程池组成根据任务触发时间来进行切换选择由哪一个线程池触发任务。任务触发器根据任务信息组装触发参数包括基本信息和阻塞策略任务触发器根据任务配置的路由策略进行路由寻址然后通过远程调用进行任务触发。XxlJobTrigger主要负责任务触发执行动作。ExecutorBizClient是ExecutorBiz接口的客户端sdk实现在调度中心使用相当于执行器的sdk调用执行器的Rest接口使用。同理ExecutorBizImpl就是ExecutorBiz执行器业务逻辑实现。调度中心的http服务就是Spring Boot实现的JobApiController层 执行器执行任务 执行器中的http服务是通过netty搭建的。ExecutorBizImpl接收到触发任务后先根据阻塞策略和任务类型进行必要参数组装组装完成后交给XxlJobExecutor处理XxlJobExecutor通过registJobThread()方法获取执行线程同时启动线程然后将触发任务信息放入任务队列由线程消费处理。JobThread任务线程负责执行任务记录执行日志到**文件**任务执行完毕后将结果推送到TriggerCallbackThread的callBackQueue回调队列中由TriggerCallbackThread负责任务结果回调。TriggerCallbackThread主要负责任务执行结果回调将执行结果反馈给调度中心。TriggerCallbackThread内部维护了triggerCallbackThread和triggerRetryCallbackThread两个线程。triggerCallbackThread负责处理callBackQueue队列中的数据回调失败将回调参数记录到回调日志文件中一直执行。triggerRetryCallbackThread主要对回调失败的数据进行重试每30s执行一次主要动作将回调日志读取出来反序列化后执行调用。 调度中心任务结果处理 AdminBizImpl基本没做复杂逻辑接收到客户端发送的回调结果后直接交给JobCompleteHelper处理。JobCompleteHelper负责对任务执行结果处理内部维护了一个线程池和一个线程。callbackThreadPool线程池主要负责异步处理执行结果。monitorThread主要处理未收到回调的任务60s执行一次判断条件①任务状态处于运行中超过10min 并且 ②执行器不在线。也就是说在线的执行器任务执行超过10min不会标记为失败。 服务端启动流程 服务端执行时序图 主要流程 任务执行调度器负责计算任务是否需要执行将需要执行的任务添加到任务触发线程池中任务触发器由快慢线程池组成根据任务触发时间来进行切换选择由哪一个线程池触发任务。任务触发器根据任务信息组装触发参数包括基本信息和阻塞策略任务触发器根据任务配置的路由策略进行路由寻址然后通过远程调用进行任务触发。 初始化 首先找到配置类 XxlJobAdminConfig。该类实现InitializingBean接口和DisposableBean接口主要用于xxl-job-admin初始化和销毁动作。 afterPropertiesSet执行初始化操作 /*** 在Bean对象属性填充完成后调用*/ Override public void afterPropertiesSet() throws Exception {// 利用静态声明的只会加载一次的特性初始化一个单例对象。adminConfig this;// 初始化xxl-job调度器xxlJobScheduler new XxlJobScheduler();xxlJobScheduler.init(); }com.xxl.job.admin.core.scheduler.XxlJobScheduler#init初始化xxl-job调度器: public void init() throws Exception {// init i18ninitI18n();// admin trigger pool start -- 初始化触发器线程池JobTriggerPoolHelper.toStart();// admin registry monitor run -- 30秒执行一次,维护注册表信息判断在线超时时间90s RegistryConfig类中配置JobRegistryHelper.getInstance().start();// admin fail-monitor run -- 运行失败监视器主要失败发送邮箱重试触发器JobFailMonitorHelper.getInstance().start();// admin lose-monitor run ( depend on JobTriggerPoolHelper ) -- 任务结果处理包括执行器正常回调和任务结果丢失处理// 调度记录停留在 运行中 状态超过10min且对应执行器心跳注册失败不在线则将本地调度主动标记失败JobCompleteHelper.getInstance().start();// admin log report start -- 统计一些失败成功报表JobLogReportHelper.getInstance().start();// start-schedule ( depend on JobTriggerPoolHelper ) -- 执行调度器JobScheduleHelper.getInstance().start();logger.info( init xxl-job admin success.); }该方法主要做了如下事情 init i18n初始化触发器线程池维护注册表信息(30秒执行一次)保持心跳将丢失主机信息调度日志更改状态统计一些失败成功报表,删除过期日志执行调度器 具体流程 I. 初始化i18n 主要是针对ExecutorBlockStrategyEnum枚举的title属性进行国际化赋值处理 private void initI18n() {// 枚举都是单例的初始化调用一次赋值后即可for (ExecutorBlockStrategyEnum item : ExecutorBlockStrategyEnum.values()) {// SERIAL_EXECUTION单机串行// DISCARD_LATER丢弃后续调度// COVER_EARLY覆盖之前调度item.setTitle(I18nUtil.getString(jobconf_block_.concat(item.name())));} }II. 初始化触发器线程池【JobTriggerPoolHelper快慢线程池】 JobTriggerPoolHelper主要维护了两个线程池。 主要由JobTriggerPoolHelper类完成触发器线程池的初始化 /*** 初始化* 调度器启动时初始化了两个线程池除了慢线程池的队列大一些以及最大线程数由用户自定义以外其他配置都一致。* 快线程池用于处理时间短的任务慢线程池用于处理时间长的任务*/ public void start() {// 核心线程数10最大线程数来自配置存活时间为60s队列大小1000线程工厂配置线程名。拒绝策略为AbortPolicy直接抛出异常fastTriggerPool new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue(1000),r - new Thread(r, xxl-job, admin JobTriggerPoolHelper-fastTriggerPool- r.hashCode()));// 慢线程池初始化 触发的任务在一分钟内超时10次则采用慢触发器执行。拒绝策略为AbortPolicy直接抛出异常slowTriggerPool new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue(2000),r - new Thread(r, xxl-job, admin JobTriggerPoolHelper-slowTriggerPool- r.hashCode())); }注意这里分别初始化了2个线程池一个快一个慢优先选择快当一分钟以内任务触发时间超时10次【超时时间为500ms】则加入慢线程池执行。 III. 维护注册表信息【JobRegistryHelper】(30秒执行一次) JobRegistryHelper#start主要完成3件事情 初始化注册或者删除线程池主要负责客户端注册或者销毁到xxl_job_registry表异步处理调度中心的api为com.xxl.job.admin.controller.JobApiController初始化守护线程每30秒执行一次。 从xxl_job_registry中删除超时的机器更新xxl_job_group执行器地址列表 /*** 初始化*/ public void start() {// for registry or remove -- 注册或者删除线程池初始化拒绝策略是由父线程执行同时会打印日志registryOrRemoveThreadPool new ThreadPoolExecutor(2,10,30L,TimeUnit.SECONDS,new LinkedBlockingQueueRunnable(2000),new ThreadFactory() {Overridepublic Thread newThread(Runnable r) {return new Thread(r, xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool- r.hashCode());}},new RejectedExecutionHandler() {Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {// 注意这里是父线程执行任务r.run();logger.warn( xxl-job, registry or remove too fast, match threadpool rejected handler(run now).);}});// for monitor -- 注册监控线程 30秒【sleep】执行一次,维护注册表信息 判断在线超时时间90sregistryMonitorThread new Thread(new Runnable() {Overridepublic void run() {while (!toStop) {try {// auto registry group -- 查询任务组数据。对应xxl-job-group表有数据时校验自动任务执行器注册信息ListXxlJobGroup groupList XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);if (groupList ! null !groupList.isEmpty()) {// remove dead address (admin/executor) -- 从xxl-job-registry中删除超时90s的机器,不分是否自动注册ListInteger ids XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());if (ids ! null ids.size() 0) {// 移除超时掉线的执行器。执行器的更新时间通过com.xxl.job.core.biz.AdminBiz.registry完成也就是执行器和admin之间的心跳XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);}// fresh online address (admin/executor) -- 从xxl-job-registry中获取执行器地址刷新到xxl-job-group中。刷新在线地址 包含执行器注册的和adminHashMapString, ListString appAddressMap new HashMap();// 查询更新时间大于当前时间-90s的数据ListXxlJobRegistry list XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());if (list ! null) {for (XxlJobRegistry item : list) {if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {// group表的appname对应registry表的registrykey字段String appname item.getRegistryKey();ListString registryList appAddressMap.get(appname);if (registryList null) {registryList new ArrayListString();}if (!registryList.contains(item.getRegistryValue())) {registryList.add(item.getRegistryValue());}appAddressMap.put(appname, registryList);}}}// fresh group addressfor (XxlJobGroup group : groupList) {ListString registryList appAddressMap.get(group.getAppname());String addressListStr null;if (registryList ! null !registryList.isEmpty()) {// 对地址进行排序Collections.sort(registryList);// 用逗号分隔 http:127.0.0.1:9092/,http://127.0.0.1:9903/StringBuilder addressListSB new StringBuilder();for (String item : registryList) {addressListSB.append(item).append(,);}addressListStr addressListSB.toString();addressListStr addressListStr.substring(0, addressListStr.length() - 1);}group.setAddressList(addressListStr);group.setUpdateTime(new Date());// 更新xxl-job-group中的数据。注册信息中没有数据也会执行更新将执行器地址更新为空XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);}}} catch (Exception e) {if (!toStop) {logger.error( xxl-job, job registry monitor thread error:, e);}}try {// 30s执行一次通过sleep实现TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);} catch (InterruptedException e) {if (!toStop) {logger.error( xxl-job, job registry monitor thread error:, e);}}}logger.info( xxl-job, job registry monitor thread stop);}});// 守护线程registryMonitorThread.setDaemon(true);registryMonitorThread.setName(xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread);registryMonitorThread.start(); } IV. 运行失败监视器【JobFailMonitorHelper】失败重试告警邮件 JobFailMonitorHelper主要是失败任务重试以及告警消息发送 失败重试 这里判断失败有2种情况(trigger_code表示任务触发状态handle_code表示任务执行结果状态200均表示成功500表示失败) 第一种trigger_code!(0,200) 且 handle_code!0 第二种handle_code!200 告警(这里可向spring注入JobAlarm)可自定义扩展 JobFailMonitorHelper内部初始化了一个守护线程monitorThread用于检测失败任务并根据配置的重试规则进行重试和告警。 /*** 初始化任务失败监听类* p* 线程每10秒执行1次*/ public void start() {monitorThread new Thread(new Runnable() {Overridepublic void run() {// monitorwhile (!toStop) {try {// 查询 1000 条失败任务ListLong failLogIds XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);if (CollUtil.isNotEmpty(failLogIds)) {for (long failLogId : failLogIds) {// lock log -- 加锁乐观修锁改alarm_status-1int lockRet XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);if (lockRet 1) {continue;}XxlJobLog log XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);XxlJobInfo info XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());// 1、fail retry monitor 失败重试if (log.getExecutorFailRetryCount() 0) {// 执行重新触发操作JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY,(log.getExecutorFailRetryCount() - 1), log.getExecutorShardingParam(), log.getExecutorParam(), null);// 追加日志String retryMsg brbrspan style\color:#F39C12;\ I18nUtil.getString(jobconf_trigger_type_retry) /spanbr;log.setTriggerMsg(log.getTriggerMsg() retryMsg);XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);}// 2、fail alarm monitor 任务失败就告警int newAlarmStatus 0; // 告警状态0-默认、-1锁定状态、1-无需告警、2-告警成功、3-告警失败if (info ! null) {// 发送告警并获取发生送结果boolean alarmResult XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);logger.debug( xxl-job 任务执行失败发送告警信息jobId:{},重试次数{}, info.getId(), log.getExecutorFailRetryCount());newAlarmStatus alarmResult ? 2 : 3;} else {newAlarmStatus 1;}XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);}}} catch (Exception e) {if (!toStop) {logger.error( xxl-job, job fail monitor thread error:, e);}}try {// 10秒执行一次TimeUnit.SECONDS.sleep(10);} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info( xxl-job, job fail monitor thread stop);}});// 守护线程monitorThread.setDaemon(true);monitorThread.setName(xxl-job, admin JobFailMonitorHelper);monitorThread.start(); }其中XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);告警的发送可以实现自定义。即实现JobAlarm接口并注入Spring即可。 Component public class JobAlarmer implements ApplicationContextAware, InitializingBean {private static Logger logger LoggerFactory.getLogger(JobAlarmer.class);private ApplicationContext applicationContext;private ListJobAlarm jobAlarmList;Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext applicationContext;}Overridepublic void afterPropertiesSet() throws Exception {MapString, JobAlarm serviceBeanMap applicationContext.getBeansOfType(JobAlarm.class);if (MapUtil.isNotEmpty(serviceBeanMap)) {jobAlarmList new ArrayList(serviceBeanMap.values());}}/*** job alarm** param info 任务信息* param jobLog 任务日志* return 告警结果*/public boolean alarm(XxlJobInfo info, XxlJobLog jobLog) {boolean result false;if (jobAlarmList ! null jobAlarmList.size() 0) {result true; // success means all-successfor (JobAlarm alarm : jobAlarmList) {boolean resultItem false;try {resultItem alarm.doAlarm(info, jobLog);} catch (Exception e) {logger.error(e.getMessage(), e);}if (!resultItem) {result false;}}}return result;}}V. 任务结果处理【JobCompleteHelper】 主要职责 初始化线程池和守护线程守护线程每60秒执行一次将执行器客户端失联的任务状态标记为完成【两个条件a.超过10分钟都处于运行中b.失联】线程池主要用于异步处理执行器的任务结果回调 /*** 初始化*/ public void start() {// for callback -- 回调线程callbackThreadPool new ThreadPoolExecutor(2,20,30L,TimeUnit.SECONDS,new LinkedBlockingQueue(3000),r - new Thread(r, xxl-job, admin JobLosedMonitorHelper-callbackThreadPool- r.hashCode()),(r, executor) - {// 超过最大数量后父线程执行任务r.run();log.warn( xxl-job, callback too fast, match threadpool rejected handler(run now).);});// for monitor - 监听线程。每60秒执行一次monitorThread new Thread(new Runnable() {Overridepublic void run() {// wait for JobTriggerPoolHelper-inittry {// 首次运行暂停50毫秒目的是为了让JobTriggerPoolHelper先初始化完成TimeUnit.MILLISECONDS.sleep(50);} catch (InterruptedException e) {if (!toStop) {log.error(e.getMessage(), e);}}// monitor -- 监听while (!toStop) {try {// 任务结果丢失处理调度记录停留在 运行中 状态超过10min且对应执行器心跳注册失败不在线则将本地调度主动标记失败// 两个条件1.运行中状态超过10min 2.心跳不在线Date losedTime DateUtil.addMinutes(new Date(), -10);ListLong losedJobIds XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);if (CollUtil.isNotEmpty(losedJobIds)) {for (Long logId : losedJobIds) {XxlJobLog jobLog new XxlJobLog();jobLog.setId(logId);jobLog.setHandleTime(new Date());jobLog.setHandleCode(ReturnT.FAIL_CODE);// 任务结果丢失标记失败jobLog.setHandleMsg(I18nUtil.getString(joblog_lost_fail));XxlJobCompleter.updateHandleInfoAndFinish(jobLog);}}} catch (Exception e) {if (!toStop) {log.error( xxl-job, job fail monitor thread error:, e);}}try {// 每60秒执行一次TimeUnit.SECONDS.sleep(60);} catch (Exception e) {if (!toStop) {log.error(e.getMessage(), e);}}}log.info( xxl-job, JobLosedMonitorHelper stop);}});// 守护线程monitorThread.setDaemon(true);monitorThread.setName(xxl-job, admin JobLosedMonitorHelper);monitorThread.start(); }com.xxl.job.admin.core.complete.XxlJobCompleter#updateHandleInfoAndFinish处理任务结果有子任务触发子任务 /*** 任务结果刷新入口* common fresh handle entrance (limit only once)** param xxlJobLog 任务信息*/ public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) {// finish 处理任务结果有子任务执行子任务finishJob(xxlJobLog);// text最大64kb 避免长度过长if (xxlJobLog.getHandleMsg().length() 15000) {xxlJobLog.setHandleMsg(xxlJobLog.getHandleMsg().substring(0, 15000));}// fresh handlereturn XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog); }VI. 报表统计与日志清理【JobLogReportHelper】 按天统计报表数据xxl_job_log_report1分钟执行一次定时清理日志信息xxl_job_log24小时执行一次 /*** 初始化启动一个守护线程处理任务报表* 每分钟执行一次*/ public void start() {// 每一分钟执行一次logrThread new Thread(() - {// 上次清理日志时间long lastCleanLogTime 0;while (!toStop) {// 1、log-report refresh: refresh log report in 3 daystry {// 分别统计今天,昨天,前天0~24点的数据 每天开始时间为 00:00:00.000 结束时间为23:59:59.999for (int i 0; i 3; i) {// 获取当前迁移i天的开始时间数据。Calendar itemDay Calendar.getInstance();itemDay.add(Calendar.DAY_OF_MONTH, -i);itemDay.set(Calendar.HOUR_OF_DAY, 0);itemDay.set(Calendar.MINUTE, 0);itemDay.set(Calendar.SECOND, 0);itemDay.set(Calendar.MILLISECOND, 0);// 开始时间getTime() 是通过new Date()返回的。Date todayFrom itemDay.getTime();itemDay.set(Calendar.HOUR_OF_DAY, 23);itemDay.set(Calendar.MINUTE, 59);itemDay.set(Calendar.SECOND, 59);itemDay.set(Calendar.MILLISECOND, 999);// 结束时间Date todayTo itemDay.getTime();XxlJobLogReport xxlJobLogReport new XxlJobLogReport();xxlJobLogReport.setTriggerDay(todayFrom);xxlJobLogReport.setRunningCount(0);xxlJobLogReport.setSucCount(0);xxlJobLogReport.setFailCount(0);// 查询当前数据 开始时间为 00:00:00.000 结束时间为23:59:59.999MapString, Object triggerCountMap XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);if (MapUtil.isNotEmpty(triggerCountMap)) {// 触发总数int triggerDayCount Integer.parseInt(String.valueOf(triggerCountMap.getOrDefault(triggerDayCount, 0)));// 运行中 trigger_code in (0, 200) and handle_code 0int triggerDayCountRunning Integer.parseInt(String.valueOf(triggerCountMap.getOrDefault(triggerDayCountRunning, 0)));// 成功 handle_code 200int triggerDayCountSuc Integer.parseInt(String.valueOf(triggerCountMap.getOrDefault(triggerDayCountSuc, 0)));// 失败数据int triggerDayCountFail triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;xxlJobLogReport.setRunningCount(triggerDayCountRunning);xxlJobLogReport.setSucCount(triggerDayCountSuc);xxlJobLogReport.setFailCount(triggerDayCountFail);}// do refresh 先执行更新无数据才插入能在一定程度上解决调度器执行器多节点并发问题// 旧数据执行更新新数据执行保存。更新返回的是变动行数小于1则表示库里不存在 。根据报表时间更新数据int ret XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);if (ret 1) {// 这里还是有很小的可能会同时执行到导致数据有多份的情况XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);}}} catch (Exception e) {if (!toStop) {log.error( xxl-job, job log report thread error:, e);}}// 2、log-clean: switch open once each day 开关打卡并且每24小时执行一次// 设置了保留日志天数并且有效时小于7为-1距离上次清理超过24小时if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays() 0 System.currentTimeMillis() - lastCleanLogTime 24 * 60 * 60 * 1000) {// expire-time 获取开始清理时间。例如配置了7天今天是2023-08-12 那么clearBeforeTime就是2023-08-05 00:00:00.000Calendar expiredDay Calendar.getInstance();expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());expiredDay.set(Calendar.HOUR_OF_DAY, 0);expiredDay.set(Calendar.MINUTE, 0);expiredDay.set(Calendar.SECOND, 0);expiredDay.set(Calendar.MILLISECOND, 0);Date clearBeforeTime expiredDay.getTime();// clean expired logListLong logIds;do {// 每次1000条 执行清理mysql in最多1000个logIds XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);if (CollUtil.isNotEmpty(logIds)) {XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);}} while (CollUtil.isNotEmpty(logIds));// update clean timelastCleanLogTime System.currentTimeMillis();}try {// 每1分钟钟执行一次TimeUnit.MINUTES.sleep(1);} catch (Exception e) {if (!toStop) {log.error(e.getMessage(), e);}}}log.info( xxl-job, job log report thread stop);});logrThread.setDaemon(true);logrThread.setName(xxl-job, admin JobLogReportHelper);logrThread.start(); }VII. 执行调度器【JobScheduleHelper】(核心) 执行调度器主要由包含了两个线程。一个线程scheduleThread负责加锁查询任务信息对任务按照触发时间分类并按照具体策略执行或者计算下次调度时间。对于执行时间间隔非常短的任务会根据具体的策略放入时间轮然后由另一个线程ringThread进行任务触发处理。 scheduleThread执行周期 扫描超时大于1000ms不等待直接继续执行。预读数据不为空执行周期为0-1000ms。预读数据为空执行周期为4000-5000ms // Wait seconds, align second 耗时小于1秒--数据少。可以sleep一会。数据多的情况下。一直执行 if (cost 1000) { // scan-overtime, not waittry {// pre-read period: success scan each second; fail skip this period;TimeUnit.MILLISECONDS.sleep((preReadSuc ? 1000 : PRE_READ_MS) - System.currentTimeMillis() % 1000);} catch (InterruptedException e) {if (!scheduleThreadToStop) {log.error(e.getMessage(), e);}} }scheduleThread会加锁查询出下次执行时间在未来5秒以内的所有任务默认一次最多获取6000条。然后根据过期时间会分成三种对应处理。 触发器下次执行时间过期时间 5S触发器下次执行时间过期时间 5S触发器下次执行时间在未来5S以内。 ringThread主要处理时间轮中的定时任务执行周期为0-1000ms。 时间轮出自Netty中的HashedWheelTimer是一个环形结构可以用时钟来类比钟面上有很多bucket每一个bucket上可以存放多个任务使用一个List保存该时刻到期的所有任务同时一个指针随着时间流逝一格一格转动并执行对应bucket上所有到期的任务。任务通过取模决定应该放入哪个bucket。和HashMap的原理类似newTask对应put使用List来解决 Hash 冲突。 xxl-job中一个时间轮有60个bucket从0-59。用于存储当前秒执行的任务列表。 以上图为例假设一个bucket是1秒则指针转动一轮表示的时间段为60s假设当前指针指向0此时需要调度一个3s后执行的任务显然应该加入到(033)的方格中指针再走3s次就可以执行了 具体代码如下 package com.xxl.job.admin.core.thread;import com.xxl.job.admin.core.conf.XxlJobAdminConfig; import com.xxl.job.admin.core.cron.CronExpression; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.scheduler.MisfireStrategyEnum; import com.xxl.job.admin.core.scheduler.ScheduleTypeEnum; import com.xxl.job.admin.core.trigger.TriggerTypeEnum; import com.xxl.job.admin.core.util.CollUtil; import lombok.extern.slf4j.Slf4j;import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit;/*** 任务执行调度* p* 工作流程* 周期性的遍历所有的jobInfo这个表通过数据库的行锁和事务一致性通过for update 来保证多个调度中心集群在同一时间内只有一个调度中心在调度任务* p* 周期性的遍历所有的jobInfo这个表读取触发时间小于nowtime5s这个时间之前的所有任务然后进行引入以下触发机制判断* p* 三种触发任务机制* ol* linowtime-TriggerNextTime()PRE_READ_MS(5s) 既超过有效误差内【5秒内】则查看当前任务的失效调度策略若为立即重试一次则立即触发调度任务且触发类型为misfire/li* linowtime-TriggerNextTime()PRE_READ_MS(5s) 既没有超过有效误差【过5秒】则立即调度调度任务/li* linowtimeTriggerNextTime() 则说明这个任务马上就要触发了放到一个时间轮上https://blog.csdn.net/zalu9810/article/details/113396131/li* /ol* p* 随后将快要触发的任务放到时间轮上时间轮由key(将要触发的时间s)value(在当前触发s的所有任务id集合)然后更新这个任务的下一次触发时间* p* 这个时间轮的任务遍历交由第二个线程处理ringThread周期在1s之内周期的扫描这个时间轮然后执行调度任务** author xuxueli 2019-05-21*/ Slf4j public class JobScheduleHelper {private static JobScheduleHelper instance new JobScheduleHelper();public static JobScheduleHelper getInstance() {return instance;}/*** 预读误差时间5秒*/public static final long PRE_READ_MS 5000;/*** 调度线程执行周期【0-1000ms】、【4000-5000ms】内的随时时间执行*/private Thread scheduleThread;/*** 时间轮线程主要处理ringData中的任务数据。并触发任务。注意这里执行周期 0-1000ms*/private Thread ringThread;/*** 默认调度线程停止标志*/private volatile boolean scheduleThreadToStop false;/*** 时间轮线程停止标志*/private volatile boolean ringThreadToStop false;/*** 时间轮环上数据长度为60。即key的范围是0-59秒。value是在具体秒数需要执行的任务ID*/private volatile static MapInteger, ListInteger ringData new ConcurrentHashMap();public void start() {// schedule threadscheduleThread new Thread(() - {try {// sleep 4000-5000毫秒时间返回内随机避免各调度中心节点同时执行TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis() % 1000);} catch (InterruptedException e) {if (!scheduleThreadToStop) {log.error(e.getMessage(), e);}}log.info( init xxl-job admin scheduler success.);// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps 1000/50 20)// 每个触发器花费50ms,每个线程单位时间(秒)内处理20任务,默认最多同时处理300*206000任务int preReadCount (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;while (!scheduleThreadToStop) {// Scan Joblong start System.currentTimeMillis();Connection conn null;boolean connAutoCommit true;PreparedStatement preparedStatement null;// 查询成功标志判断有无数据boolean preReadSuc true;try {// 设置手动提交conn XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();connAutoCommit conn.getAutoCommit();conn.setAutoCommit(false);// 获取任务调度锁表内数据信息,加写锁preparedStatement conn.prepareStatement(select * from xxl_job_lock where lock_name schedule_lock for update);preparedStatement.execute();// tx start// 1、pre readlong nowTime System.currentTimeMillis();// 查询条件1. 下次触发时间小于当前时间5s and 2.triggerStatus为1调度状态0-停止1-运行 and 3. 数据量默认取值为6000条【根据配置变动】// 任务调度错过触发时间时的可能原因服务重启调度线程被阻塞线程被耗尽上次调度持续阻塞下次调度被错过ListXxlJobInfo scheduleList XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime PRE_READ_MS, preReadCount);if (CollUtil.isNotEmpty(scheduleList)) {// 2、push time-ringfor (XxlJobInfo jobInfo : scheduleList) {// time-ring jumpif (nowTime jobInfo.getTriggerNextTime() PRE_READ_MS) {// 2.1、trigger-expire 5spass make next-trigger-time -- 任务过期超过5秒 当前时间-任务执行时间5秒 --按照过期策略处理并刷新下一次触发时间log.warn( xxl-job, schedule misfire, jobId {}, jobInfo.getId());// 1、misfire match 过期处理策略--FIRE_ONCE_NOW:立即执行一次MisfireStrategyEnum misfireStrategyEnum MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);if (MisfireStrategyEnum.FIRE_ONCE_NOW misfireStrategyEnum) {// FIRE_ONCE_NOW 》 triggerJobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);log.debug( xxl-job, schedule push trigger : jobId {}, jobInfo.getId());}// 2、fresh next 刷新下一次执行时间refreshNextValidTime(jobInfo, new Date());} else if (nowTime jobInfo.getTriggerNextTime()) {// 2.2、trigger-expire 5sdirect-trigger make next-trigger-time 任务过期小于5秒 -- 直接触发任务并计算下次触发时间// 1、triggerJobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);log.debug( xxl-job, schedule push trigger : jobId {}, jobInfo.getId());// 2、fresh nextrefreshNextValidTime(jobInfo, new Date());// next-trigger-time in 5s, pre-read again 下次触发时间在未来5秒内这块跟else中逻辑一致目的是为了避免下次扫描时漏掉数据if (jobInfo.getTriggerStatus() 1 nowTime PRE_READ_MS jobInfo.getTriggerNextTime()) {// 1、make ring second 时间转化为秒 时间轮为长度为60 (如果执行时间为 2023/08/29 17:03:26 则返回26)int ringSecond (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);// 2、push time ring 将当前时间添加到时间轮pushTimeRing(ringSecond, jobInfo.getId());// 3、fresh next 刷新下一次触发时间refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}} else {// 2.3、trigger-pre-readtime-ring trigger make next-trigger-time// 1、make ring secondint ringSecond (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);// 2、push time ringpushTimeRing(ringSecond, jobInfo.getId());// 3、fresh nextrefreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}// 3、update trigger info 更新任务信息long currentTime System.currentTimeMillis();for (XxlJobInfo jobInfo : scheduleList) {XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);}log.debug( xxl-job,更新任务信息耗时统计,count:{}, Time-consuming:{}ms, scheduleList.size(), System.currentTimeMillis() - currentTime);} else {preReadSuc false;}// tx stop} catch (Exception e) {if (!scheduleThreadToStop) {log.error( xxl-job, JobScheduleHelper#scheduleThread error:, e);}} finally {// commitif (conn ! null) {try {// 提交事务conn.commit();} catch (SQLException e) {if (!scheduleThreadToStop) {log.error(e.getMessage(), e);}}try {// 设置为自动提交conn.setAutoCommit(connAutoCommit);} catch (SQLException e) {if (!scheduleThreadToStop) {log.error(e.getMessage(), e);}}try {// 关闭连接conn.close();} catch (SQLException e) {if (!scheduleThreadToStop) {log.error(e.getMessage(), e);}}}// close PreparedStatementif (null ! preparedStatement) {try {preparedStatement.close();} catch (SQLException e) {if (!scheduleThreadToStop) {log.error(e.getMessage(), e);}}}}long cost System.currentTimeMillis() - start;// Wait seconds, align second 耗时小于1秒--数据少。可以sleep一会。数据多的情况下。一直执行if (cost 1000) { // scan-overtime, not waittry {// pre-read period: success scan each second; fail skip this period;TimeUnit.MILLISECONDS.sleep((preReadSuc ? 1000 : PRE_READ_MS) - System.currentTimeMillis() % 1000);} catch (InterruptedException e) {if (!scheduleThreadToStop) {log.error(e.getMessage(), e);}}}}log.info( xxl-job, JobScheduleHelper#scheduleThread stop);});scheduleThread.setDaemon(true);scheduleThread.setName(xxl-job, admin JobScheduleHelper#scheduleThread);scheduleThread.start();// ring thread 时间轮ringThread new Thread(new Runnable() {Overridepublic void run() {while (!ringThreadToStop) {// align secondtry {// 执行周期 0-1000msTimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);} catch (InterruptedException e) {if (!ringThreadToStop) {log.error(e.getMessage(), e);}}try {// second data// 时间轮上的数据集合。即任务ID集合ListInteger ringItemData new ArrayList();// 避免处理耗时太长跨过刻度向前校验一个刻度int nowSecond Calendar.getInstance().get(Calendar.SECOND);for (int i 0; i 2; i) {// (nowSecond 60 - i) % 60 和 (nowSecond - i) % 60 加60的目的避免为负数ListInteger tmpData ringData.remove((nowSecond 60 - i) % 60);if (tmpData ! null) {ringItemData.addAll(tmpData);}}// ring triggerlog.debug( xxl-job, time-ring beat : {} {}, nowSecond, Collections.singletonList(ringItemData));// do triggerfor (int jobId : ringItemData) {// do triggerJobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);}// clearringItemData.clear();} catch (Exception e) {if (!ringThreadToStop) {log.error( xxl-job, JobScheduleHelper#ringThread error:, e);}}}log.info( xxl-job, JobScheduleHelper#ringThread stop);}});ringThread.setDaemon(true);ringThread.setName(xxl-job, admin JobScheduleHelper#ringThread);ringThread.start();}/*** 计算任务下一次触发时间** param jobInfo 任务信息* param fromTime 当前时间* throws Exception exp*/private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception {Date nextValidTime generateNextValidTime(jobInfo, fromTime);if (nextValidTime ! null) {jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());jobInfo.setTriggerNextTime(nextValidTime.getTime());} else {// 调度状态0-停止1-运行jobInfo.setTriggerStatus(0);jobInfo.setTriggerLastTime(0);jobInfo.setTriggerNextTime(0);log.warn( xxl-job, refreshNextValidTime fail for job: jobId{}, scheduleType{}, scheduleConf{}, jobInfo.getId(), jobInfo.getScheduleType(), jobInfo.getScheduleConf());}}/*** 添加任务到时间轮** param ringSecond 时间【秒】* param jobId 任务id*/private void pushTimeRing(int ringSecond, int jobId) {// push async ring// 时间轮不存在对应时间时就新建一个list存在取值。list中添加任务idListInteger ringItemData ringData.computeIfAbsent(ringSecond, k - new ArrayList());ringItemData.add(jobId);log.debug( xxl-job, schedule push time-ring : {} {}, ringSecond, Collections.singletonList(ringItemData));}public void toStop() {// 1、stop schedulescheduleThreadToStop true;try {TimeUnit.SECONDS.sleep(1); // wait} catch (InterruptedException e) {log.error(e.getMessage(), e);}if (scheduleThread.getState() ! Thread.State.TERMINATED) {// interrupt and waitscheduleThread.interrupt();try {scheduleThread.join();} catch (InterruptedException e) {log.error(e.getMessage(), e);}}// if has ring databoolean hasRingData false;if (!ringData.isEmpty()) {for (int second : ringData.keySet()) {ListInteger tmpData ringData.get(second);if (tmpData ! null tmpData.size() 0) {hasRingData true;break;}}}if (hasRingData) {try {TimeUnit.SECONDS.sleep(8);} catch (InterruptedException e) {log.error(e.getMessage(), e);}}// stop ring (wait job-in-memory stop)ringThreadToStop true;try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {log.error(e.getMessage(), e);}if (ringThread.getState() ! Thread.State.TERMINATED) {// interrupt and waitringThread.interrupt();try {ringThread.join();} catch (InterruptedException e) {log.error(e.getMessage(), e);}}log.info( xxl-job, JobScheduleHelper stop);}// ---------------------- tools ----------------------/*** 根据当前时间计算下次执行时间** param jobInfo 任务信息* param fromTime 当前时间* return 下次执行时间* throws Exception Exp*/public static Date generateNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception {ScheduleTypeEnum scheduleTypeEnum ScheduleTypeEnum.match(jobInfo.getScheduleType(), null);if (ScheduleTypeEnum.CRON scheduleTypeEnum) {// 返回满足cron表达式的给定日期/时间之后的下一个日期/时间return new CronExpression(jobInfo.getScheduleConf()).getNextValidTimeAfter(fromTime);} else if (ScheduleTypeEnum.FIX_RATE scheduleTypeEnum /*|| ScheduleTypeEnum.FIX_DELAY scheduleTypeEnum*/) {// 当前时间之后的下一次时间 固定速率return new Date(fromTime.getTime() Integer.parseInt(jobInfo.getScheduleConf()) * 1000L);}return null;}} 执行器启动流程 执行器启动流程时序图 主要流程 在XxlJobSpringExecutor初始化时执行相关的方法解析标有XxlJob注解的方法标有Lazy的类不处理。将标注有XxlJob注解的方法转化为MethodJobHandler类并存储到XxlJobExecutor#jobHandlerRepository属性中。初始化SpringGlueFactory。初始化日志路径XxlJobFileAppender主要用于处理日志初始化admin-client用于进行任务回调以及心跳检查初始化日志清理线程JobLogFileCleanThread初始化任务回调线程TriggerCallbackThread启动内嵌服务EmbedServer基于netty实现 初始化 客户端执行器的核心接口是XxlJobExecutor主要有两个实现类XxlJobSimpleExecutor和XxlJobSpringExecutor。其中XxlJobSpringExecutor主要是针对spring框架的。 xxl-job整合Spring场景下需要手动配置XxlJobSpringExecutor实例并注册为bean。 Bean public XxlJobSpringExecutor xxlJobExecutor() {logger.info( xxl-job config init.);XxlJobSpringExecutor xxlJobSpringExecutor new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor; }在XxlJobExecutor接口中主要实现了执行器客户端的启动和销毁、admin-client远程调用初始化、executor-server远程调用初始化、JobHandler任务缓存、jobthread任务线程缓存。 接口继承体系XxlJobSpringExecutor注入了 ApplicationContext 对象。以及实现了 SmartInitializingSingleton 接口实现该接口的当spring容器初始完成紧接着执行监听器发送监听后就会遍历所有的Bean然后初始化所有单例非懒加载的bean最后在实例化阶段结束时触发回调接口。 com.xxl.job.core.executor.impl.XxlJobSpringExecutor#afterSingletonsInstantiated主要完成三件事 初始化调度器资源管理器从spring容器中将标记了XxlJob注解的方法将其封装并添加到map中刷新GlueFactory启动服务接收服务器请求等 // startOverridepublic void afterSingletonsInstantiated() {// init JobHandler Repository/*initJobHandlerRepository(applicationContext);*/// init JobHandler Repository (for method) 初始化任务 标记XxlJob注解的方法类型的initJobHandlerMethodRepository(applicationContext);// refresh GlueFactory 舒心GlueFactoryGlueFactory.refreshInstance(1);// super start 调用父类接口启动服务try {super.start();} catch (Exception e) {throw new RuntimeException(e);}}具体流程 I. 初始化JobHandler com.xxl.job.core.executor.impl.XxlJobSpringExecutor#initJobHandlerMethodRepository该方法主要做了如下事情 从spring容器获取所有对象并遍历查找方法上标记XxlJob注解的方法。将xxljob配置的jobname作为key根据初始化和销毁方法配置数据构造MethodJobHandler作为value注册jobHandlerRepository 中 任务执行接口IJobHandler,之前版本是自动注册IJobHandler接口的实现类的后续版本改为了注册标记了XxlJob注解的方法。如果有IJobHandler实现类形式需要自己注册。 com.xxl.job.core.executor.impl.XxlJobSpringExecutor#initJobHandlerMethodRepository方法比较简单。主要流程 加载所有非懒加载Bean找出标记了XxlJob注解的方法并解析初始化和销毁属性并构造MethodJobHandler类注册MethodJobHandler到jobHandlerRepository 缓存中。MethodJobHandler任务最终是通过反射调用执行的。 private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {if (applicationContext null) {return;}// init job handler from methodString[] beanDefinitionNames applicationContext.getBeanNamesForType(Object.class, false, true);for (String beanDefinitionName : beanDefinitionNames) {// get beanObject bean null;Lazy onBean applicationContext.findAnnotationOnBean(beanDefinitionName, Lazy.class);if (onBean!null){logger.debug(xxl-job annotation scan, skip Lazy Bean:{}, beanDefinitionName);continue;}else {bean applicationContext.getBean(beanDefinitionName);}// filter methodMapMethod, XxlJob annotatedMethods null; // referred to org.springframework.context.event.EventListenerMethodProcessor.processBeantry {annotatedMethods MethodIntrospector.selectMethods(bean.getClass(),new MethodIntrospector.MetadataLookupXxlJob() {Overridepublic XxlJob inspect(Method method) {return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);}});} catch (Throwable ex) {logger.error(xxl-job method-jobhandler resolve error for bean[ beanDefinitionName ]., ex);}if (annotatedMethodsnull || annotatedMethods.isEmpty()) {continue;}// generate and regist method job handlerfor (Map.EntryMethod, XxlJob methodXxlJobEntry : annotatedMethods.entrySet()) {Method executeMethod methodXxlJobEntry.getKey();XxlJob xxlJob methodXxlJobEntry.getValue();// registregistJobHandler(xxlJob, bean, executeMethod);}}}com.xxl.job.core.executor.XxlJobExecutor#registJobHandler(com.xxl.job.core.handler.annotation.XxlJob, java.lang.Object, java.lang.reflect.Method)方法完成注册 protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod){if (xxlJob null) {return;}String name xxlJob.value();//make and simplify the variables since theyll be called several times laterClass? clazz bean.getClass();String methodName executeMethod.getName();if (name.trim().length() 0) {throw new RuntimeException(xxl-job method-jobhandler name invalid, for[ clazz # methodName ] .);}if (loadJobHandler(name) ! null) {throw new RuntimeException(xxl-job jobhandler[ name ] naming conflicts.);}// execute method/*if (!(method.getParameterTypes().length 1 method.getParameterTypes()[0].isAssignableFrom(String.class))) {throw new RuntimeException(xxl-job method-jobhandler param-classtype invalid, for[ bean.getClass() # method.getName() ] , The correct method format like \ public ReturnTString execute(String param) \ .);}if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {throw new RuntimeException(xxl-job method-jobhandler return-classtype invalid, for[ bean.getClass() # method.getName() ] , The correct method format like \ public ReturnTString execute(String param) \ .);}*/executeMethod.setAccessible(true);// init and destroyMethod initMethod null;Method destroyMethod null;if (xxlJob.init().trim().length() 0) {try {initMethod clazz.getDeclaredMethod(xxlJob.init());initMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException(xxl-job method-jobhandler initMethod invalid, for[ clazz # methodName ] .);}}if (xxlJob.destroy().trim().length() 0) {try {destroyMethod clazz.getDeclaredMethod(xxlJob.destroy());destroyMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException(xxl-job method-jobhandler destroyMethod invalid, for[ clazz # methodName ] .);}}// registry jobhandlerregistJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));}II. 刷新GlueFactory com.xxl.job.core.glue.GlueFactory#refreshInstance刷新GlueFactory工厂模式 public static void refreshInstance(int type){if (type 0) {glueFactory new GlueFactory();} else if (type 1) {glueFactory new SpringGlueFactory();}}III. 核心启动类【XxlJobExecutor】 该方法主要做了如下事情 初始化日志文件封装调度中心请求路径用于访问调度中心清除过期日志回调调度中心任务执行状态执行内嵌服务 com.xxl.job.core.executor.XxlJobExecutor#start方法 public void start() throws Exception {// init logpath 日志路径初始化XxlJobFileAppender.initLogPath(logPath);// init invoker, admin-client admin-client初始化initAdminBizList(adminAddresses, accessToken);// init JobLogFileCleanThreadJobLogFileCleanThread.getInstance().start(logRetentionDays);// init TriggerCallbackThreadTriggerCallbackThread.getInstance().start();// init executor-serverinitEmbedServer(address, ip, port, appname, accessToken); }初始化日志文件【XxlJobFileAppender】 XxlJobFileAppender主要用于处理执行日志信息。包括日志路径初始化、创建日志文件、追加日志、读取日志信息等。 方法都比较简单这里不过多介绍。 初始化调度中心客户端【AdminBizClient】 AdminBizClient封装调度中心请求路径用于访问调度中心。 private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {if (adminAddresses!null adminAddresses.trim().length()0) {for (String address: adminAddresses.trim().split(,)) {if (address!null address.trim().length()0) {AdminBiz adminBiz new AdminBizClient(address.trim(), accessToken);if (adminBizList null) {adminBizList new ArrayListAdminBiz();}adminBizList.add(adminBiz);}}}}执行器日志文件清理【JobLogFileCleanThread】 JobLogFileCleanThread日志文件清理线程主要用于日志文件清理。需要注意的是配置参数小于3天不执行清理。每天执行一次清理。 代码也非常简单 public void start(final long logRetentionDays) {// limit min valueif (logRetentionDays 3) {return;}localThread new Thread(new Runnable() {Overridepublic void run() {while (!toStop) {try {// clean log dir, over logRetentionDaysFile[] childDirs new File(XxlJobFileAppender.getLogPath()).listFiles();if (childDirs ! null childDirs.length 0) {// todayCalendar todayCal Calendar.getInstance();todayCal.set(Calendar.HOUR_OF_DAY, 0);todayCal.set(Calendar.MINUTE, 0);todayCal.set(Calendar.SECOND, 0);todayCal.set(Calendar.MILLISECOND, 0);Date todayDate todayCal.getTime();for (File childFile : childDirs) {// validif (!childFile.isDirectory()) {continue;}if (childFile.getName().indexOf(-) -1) {continue;}// file create dateDate logFileCreateDate null;try {SimpleDateFormat simpleDateFormat new SimpleDateFormat(yyyy-MM-dd);logFileCreateDate simpleDateFormat.parse(childFile.getName());} catch (ParseException e) {logger.error(e.getMessage(), e);}if (logFileCreateDate null) {continue;}if ((todayDate.getTime() - logFileCreateDate.getTime()) logRetentionDays * (24 * 60 * 60 * 1000)) {FileUtil.deleteRecursively(childFile);}}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {// 每天执行一次TimeUnit.DAYS.sleep(1);} catch (InterruptedException e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info( xxl-job, executor JobLogFileCleanThread thread destroy.);}});localThread.setDaemon(true);localThread.setName(xxl-job, executor JobLogFileCleanThread);localThread.start();}回调调度中心反馈任务结果【TriggerCallbackThread】 TriggerCallbackThread主要用于处理任务回调以及任务回调失败后的重试操作。 服务注册与心跳检测 服务注册主要是指执行器客户端每隔30s向调度中心定时发送执行器配置信息appName、address等在执行器中主要通过ExecutorRegistryThread类来完成。注册过程通过调用调度中心的api接口来完成注册信息传递。 在调度中心也会检测执行器是否失联(超过90s未上报数据)失联的执行器地址会被清理。 主要的核心类包括 执行器客户端 ExecutorRegistryThread执行器注册线程每隔30s向调度中心注册一次。通过AdminBizClient发送出注册请求都是post请求。 调度中心 AdminBizImpl接收到请求不出特殊处理转交给JobRegistryHelper完成注册JobRegistryHelper内部维护了registryOrRemoveThreadPool注册或者移除线程池用于异步处理客户端的注册请求。JobRegistryHelper内部还维护了registryMonitorThread监控线程用于处理超过90s未进行注册更新的执行器每30s处理一次。 参考资料 分布式任务调度平台XXL-JOBxxl-job源码解析(看这一篇就够了超简约且详细)_Nuan_Feng的博客-CSDN博客
http://www.zqtcl.cn/news/10005/

相关文章:

  • 手机小游戏网站大全扬州市邗江区城乡建设局网站
  • 做期货要看哪些网站酒泉建设局造价官网站
  • 哈尔滨品牌网站建设上海市建设考核中心网站
  • 用明星名字做网站网站推广计划书范文
  • 重新安wordpress网站机顶盒视频网站建设
  • 电商网站成本大型网站开发php框架
  • 手表网站免费设计企业管理软件公司排名
  • 如何对网站页面进行优化视频号认证需要多少钱
  • 网站开发后服务费网上免费开店怎么开
  • 南京一对一网站建设dwcc网站前台脚本怎么做音频
  • 朝西村小江网站建设怎么让网站自适应
  • 网站栏目结构优化公司网站建设一年多少钱
  • 网至普的营销型网站建设八师石河子精神文明建设网站
  • 网站推广的方法是什么网站开发使用的软件
  • 网站风格特点网站根目录文件夹
  • 深圳华强做网站网站空间域名注册
  • 淄博做网站seo用网页源代码下载文件
  • 苏州网站推广工具群晖可以做几个网站
  • 网站模板建设教程小公司做网站的好处
  • 网吧网站怎么做wordpress搭建博客系统
  • 盈润企业网站管理系统中国建筑查询网
  • 为什么网站显示乱码水资源论证网站建设
  • 为什么四川省建设厅网站打不开优化大师官网下载安装
  • 网站建设法语免费网站建设视频教程
  • 做网站会什么软件移动端网站建设的软件有哪些
  • 王野天 女演员黄冈网站seo
  • 公司网站是别人做的如何换logo个人seo外包
  • 成都青羊建设厅官方网站个人备案网站做企业会怎样
  • 长春网站网络公司自己做个网站怎么赚钱
  • 信誉好的做网站国际贸易网站开发