wordpress如何建立网站,网站首页标题,科技创新绘画作品,装修设计公司网站有哪些我们的项目中用apschedule作为核心定时调度模块。所以对apschedule进行了一些调查和源码级的分析。 1、为什么选择apschedule#xff1f; 听信了一句话#xff0c;apschedule之于python就像是quartz之于java。实际用起来还是不错的。 2、安装 # pip安装方式
$ pip install ap… 我们的项目中用apschedule作为核心定时调度模块。所以对apschedule进行了一些调查和源码级的分析。 1、为什么选择apschedule 听信了一句话apschedule之于python就像是quartz之于java。实际用起来还是不错的。 2、安装 # pip安装方式
$ pip install apscheduler
# 源码编译方式
$ wget https://pypi.python.org/pypi/APScheduler/#downloads
$ python setup.py install 3、apschedule有四个主要的组件 1trigger - 触发器 2job stores - 任务存储内存memory和持久化persistence 3executor - 执行器(实现是基于concurrent.futures的线程池或者进程池) 4schedulers - 调度器(控制着其他的组件最常用的是background方式和blocking方式) 先上一个例子 # -*- coding:utf-8 -*-
import redis
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_MISSED
class ScheduleFactory(object):def __init__(self):if not hasattr(ScheduleFactory, __scheduler):__scheduler ScheduleFactory.get_instance()self.scheduler __schedulerstaticmethoddef get_instance():pool redis.ConnectionPool(host10.94.99.56,port6379,)r redis.StrictRedis(connection_poolpool)jobstores {redis: RedisJobStore(2, r),default: SQLAlchemyJobStore(urlsqlite:///jobs.sqlite)}executors {default: ThreadPoolExecutor(max_workers30),processpool: ProcessPoolExecutor(max_workers30)}job_defaults {coalesce: False,max_instances: 3}scheduler BackgroundScheduler(jobstoresjobstores, executorsexecutors, job_defaultsjob_defaults, daemonicFalse)
return scheduler 说明上例中scheduleFactory被实现为一个单例模式保证new出的对象全局唯一 4、对scheduler的选择 这里只给出两个场景 1BackgroundScheduler这种方式在创建scheduler的父进程退出后任务同时停止调度。适用范围集成在服务中例如django。 2BlockingScheduler这种方式会阻塞住创建shceduler的进程适用范围该程序只干调度这一件事情。 选择完调度器之后 1scheduler.start() 启动调度器 2scheduler.shutdown() 停止调度器调用该方法调度器等到所有执行中的任务执行完成再退出可以使用waitFalse禁用 程序变为如下样子 class ScheduleFactory(object):def __init__(self):if not hasattr(ScheduleFactory, __scheduler):__scheduler ScheduleFactory.get_instance()self.scheduler __schedulerstaticmethoddef get_instance():pool redis.ConnectionPool(host10.94.99.56,port6379,)r redis.StrictRedis(connection_poolpool)jobstores {redis: RedisJobStore(2, r),default: SQLAlchemyJobStore(urlsqlite:///jobs.sqlite)}executors {default: ThreadPoolExecutor(max_workers30),processpool: ProcessPoolExecutor(max_workers30)}job_defaults {coalesce: False,max_instances: 3}scheduler BackgroundScheduler(jobstoresjobstores, executorsexecutors, job_defaultsjob_defaults, daemonicFalse)# scheduler BlockingScheduler(jobstoresjobstores, executorsexecutors, job_defaultsjob_defaults, daemonicFalse)return schedulerdef start(self):self.scheduler.start()def shutdown(self):self.scheduler.shutdown() 5、对jobstores的选择 大的方向有两个 1非持久化 可选的storesMemoryJobStrore 适用于你不会频繁启动和关闭调度器而且对定时任务丢失批次不敏感。 2持久化 可选的storesSQLAlchemyJobStore RedisJobStoreMongoDBJobStoreZooKeeperJobStore 适用于你对定时任务丢失批次敏感的情况 jobStores初始化配置的方式是使用一个字典例如 jobstores {redis: RedisJobStore(2, r),default: SQLAlchemyJobStore(urlsqlite:///jobs.sqlite)} key是你配置store的名字后面在添加任务的使用可以指定对应的任务使用对应的store例如这里选用的都是keydefault的store。 def add_job(self, job_func, interval, id, job_func_paramsNone)self.scheduler.add_job(job_func, jobstoredefault, triggerinterval, secondsinterval, idid, kwargsjob_func_params, executordefault, next_run_timenext_run_time, misfire_grace_time30) 6、executor的选择 只说两个线程池和进程池。默认default是线程池方式。这个数是执行任务的实际并发数如果你设置的小了而job添加的比较多可能出现丢失调度的情况。 同时对于python多线程场景如果是计算密集型任务实际的并发度达不到配置的数量。所以这个数字要根据具体的要求设置。 一般来说我们设置并发为30对一般的场景是没有问题的。 executors {default: ThreadPoolExecutor(max_workers30),processpool: ProcessPoolExecutor(max_workers30)} 同样在add_job的时候我们可以选择对应的执行器 def add_job(self, job_func, interval, id, job_func_paramsNone)self.scheduler.add_job(job_func, jobstoredefault, triggerinterval, secondsinterval, idid, kwargsjob_func_params, executordefault, next_run_timenext_run_time, misfire_grace_time30) 7、trigger的选择 这是最简单的一个了有三种不用配置 1、date - 每天的固定时间 2、interval - 间隔多长时间执行 3、cron - 正则 8、job的增删改查接口api可以参看手册 http://apscheduler.readthedocs.io/en/latest/userguide.html#choosing-the-right-scheduler-job-store-s-executor-s-and-trigger-s 9、问题fix 12017-07-24 14:06:28,480 [apscheduler.executors.default:120] [WARNING]- Run time of job etl_func (trigger: interval[0:01:00], next run at: 2017-07-24 14:07:27 CST) was missed by 0:00:01.245424 这个问题对应的源码片段是 def run_job(job, jobstore_alias, run_times, logger_name):Called by executors to run the job. Returns a list of scheduler events to be dispatched by thescheduler.events []logger logging.getLogger(logger_name)for run_time in run_times:# See if the job missed its run time window, and handle# possible misfires accordinglyif job.misfire_grace_time is not None:difference datetime.now(utc) - run_timegrace_time timedelta(secondsjob.misfire_grace_time)if difference grace_time:events.append(JobExecutionEvent(EVENT_JOB_MISSED, job.id, jobstore_alias,run_time))logger.warning(Run time of job %s was missed by %s, job, difference)continuelogger.info(Running job %s (scheduled at %s), job, run_time)try:retval job.func(*job.args, **job.kwargs)except:exc, tb sys.exc_info()[1:]formatted_tb .join(format_tb(tb))events.append(JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time,exceptionexc, tracebackformatted_tb))logger.exception(Job %s raised an exception, job)else:events.append(JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time,retvalretval))logger.info(Job %s executed successfully, job)return events 这里面有个参数是misfire_grace_time默认是1s如果任务的实际执行时间与任务调度时间的时间差misfire_grace_time就会warning并且跳过这次任务的调度 为什么会发生这个问题 1executor并发度不够你添加的任务太多 2) misfire_grace_time还是太小了 2如果你使用的triggerinterval并且设置了misfire_grace_time30这种的话如果你首次启动的时间是10:50那么调度间隔和实际执行可能有1分钟的误差 怎么解决这个问题呢你可以通过next_run_time设置首次调度的时间让这个时间取整分钟。例如 def add_job(self, job_func, interval, id, job_func_paramsNone):next_minute (datetime.now() timedelta(minutes1)).strftime(%Y-%m-%d %H:%M)next_run_time datetime.strptime(next_minute, %Y-%m-%d %H:%M)self.scheduler.add_job(job_func, jobstoredefault, triggerinterval, secondsinterval, idid, kwargsjob_func_params, executordefault, next_run_timenext_run_time, misfire_grace_time30) 32017-07-25 11:02:00,003 [apscheduler.scheduler:962] [WARNING]- Execution of job rule_func (trigger: interval[0:01:00], next run at: 2017-07-25 11:02:00 CST) skipped: maximum number of running instances reached (1) 对应的源码为 for job in due_jobs:# Look up the jobs executortry:executor self._lookup_executor(job.executor)except:self._logger.error(Executor lookup (%s) failed for job %s -- removing it from the job store, job.executor, job)self.remove_job(job.id, jobstore_alias)continuerun_times job._get_run_times(now)run_times run_times[-1:] if run_times and job.coalesce else run_timesif run_times:try:executor.submit_job(job, run_times)except MaxInstancesReachedError:self._logger.warning(Execution of job %s skipped: maximum number of running instances reached (%d), job, job.max_instances)event JobSubmissionEvent(EVENT_JOB_MAX_INSTANCES, job.id,jobstore_alias, run_times)events.append(event) submit_job的源码 with self._lock:if self._instances[job.id] job.max_instances:raise MaxInstancesReachedError(job)self._do_submit_job(job, run_times)self._instances[job.id] 1 这是什么意思呢当对一个job的一次调度的任务数max_instances会触发这个异常并终止调度。例如对一个批次的调度比如job1在10:00这次的调度执行的时候发现有两个任务被添加了。这怎么会发生呢会。可能09:59分的调度没有成功执行但是持久化了下来那么在1000会尝试再次执行。 max_instances默认是1如果想让这种异常放过的话你可以设置max_instances大一些比如max_instances3 10、如果你想监控你的调度那么apschedule提供了listener机制可以监听一些异常。只需要注册监听者就好 def add_err_listener(self):self.scheduler.add_listener(err_listener, EVENT_JOB_MAX_INSTANCES|EVENT_JOB_MISSED|EVENT_JOB_ERROR)def err_listener(ev):msg if ev.code EVENT_JOB_ERROR:msg ev.tracebackelif ev.code EVENT_JOB_MISSED:msg missed job, job_id:%s, schedule_run_time:%s % (ev.job_id, ev.scheduled_run_time)elif ev.code EVENT_JOB_MAX_INSTANCES:msg reached maximum of running instances, job_id:%s %(ev.job_id)rs RobotSender()rs.send(https://oapi.dingtalk.com/robot/send?access_token499ca69a2b45402c00503acea611a6ae6a2f1bacb0ca4d33365595d768bb2a58,u[apscheduler调度异常] 异常信息:%s % (msg),15210885002,False) 最后的代码 # -*- coding:utf-8 -*-
import redis
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler, BlockingScheduler
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_MISSED
from alarmkits.send_robot import RobotSenderclass ScheduleFactory(object):def __init__(self):if not hasattr(ScheduleFactory, __scheduler):__scheduler ScheduleFactory.get_instance()self.scheduler __schedulerstaticmethoddef get_instance():pool redis.ConnectionPool(host10.94.99.56,port6379,)r redis.StrictRedis(connection_poolpool)jobstores {redis: RedisJobStore(2, r),default: SQLAlchemyJobStore(urlsqlite:///jobs.sqlite)}executors {default: ThreadPoolExecutor(max_workers30),processpool: ProcessPoolExecutor(max_workers30)}job_defaults {coalesce: False,max_instances: 3}scheduler BackgroundScheduler(jobstoresjobstores, executorsexecutors, job_defaultsjob_defaults, daemonicFalse)# scheduler BlockingScheduler(jobstoresjobstores, executorsexecutors, job_defaultsjob_defaults, daemonicFalse)return schedulerdef start(self):self.scheduler.start()def shutdown(self):self.scheduler.shutdown()def add_job(self, job_func, interval, id, job_func_paramsNone):next_minute (datetime.now() timedelta(minutes1)).strftime(%Y-%m-%d %H:%M)next_run_time datetime.strptime(next_minute, %Y-%m-%d %H:%M)self.scheduler.add_job(job_func,jobstoredefault,triggerinterval,secondsinterval,idid,kwargsjob_func_params,executordefault,next_run_timenext_run_time,misfire_grace_time30,max_instances3)def remove_job(self, id):self.scheduler.remove_job(id)def modify_job(self, id, interval):self.scheduler.modify_job(job_idid, secondsinterval)def add_err_listener(self):self.scheduler.add_listener(err_listener, EVENT_JOB_MAX_INSTANCES|EVENT_JOB_MISSED|EVENT_JOB_ERROR)def err_listener(ev):msg if ev.code EVENT_JOB_ERROR:msg ev.tracebackelif ev.code EVENT_JOB_MISSED:msg missed job, job_id:%s, schedule_run_time:%s % (ev.job_id, ev.scheduled_run_time)elif ev.code EVENT_JOB_MAX_INSTANCES:msg reached maximum of running instances, job_id:%s %(ev.job_id)rs RobotSender()rs.send(https://oapi.dingtalk.com/robot/send?access_token499ca69a2b45402c00503acea611a6ae6a2f1bacb0ca4d33365595d768bb2a58,u[apscheduler调度异常] 异常信息:%s % (msg),15210885002,False) 转载于:https://www.cnblogs.com/zhuminghui/p/9145319.html