官方网站建设 搜搜磐石网络,牡丹江宣传网,专业集团门户网站建设公司,wordpress果酱主题分享github 地址#xff1a;https://github.com/agronholm/apscheduler apscheduler 基本概念介绍 说到定时任务#xff0c;会想起 linux 自带的 crontab #xff0c;windows 自带的任务计划#xff0c;都可以实现守时任务。操作系统基本都会提供定时任务的实现#xff0c;但是…
github 地址https://github.com/agronholm/apscheduler apscheduler 基本概念介绍 说到定时任务会想起 linux 自带的 crontab windows 自带的任务计划都可以实现守时任务。操作系统基本都会提供定时任务的实现但是如果想要更加精细化的控制或者说任务程序需要跨平台运行最好还是自己实现定时任务框架Python中定时任务的解决方案总体来说有四种分别是 crontab、 scheduler、 Celery、 APScheduler其中 crontab 不适合多台服务器的配置、 scheduler 太过于简单、 Celery 依赖的软件比较多比较耗资源。最好的解决方案就是 APScheduler。Python 的 apscheduler 提供了非常丰富而且方便易用的定时任务接口。 apscheduler ( advance python scheduler ) 是 Python 中的任务调度库使用起来十分方便 。提供了基于 日期、固定时间间隔、crontab 类型的任务可以在主程序的运行过程中快速增加新作业或删除旧作业如果把作业存储在数据库中那么作业的状态会被保存当调度器重启时不必重新添加作业作业会恢复原状态继续执行。apscheduler 可以当作一个跨平台的调度工具来使用可以做为 linux 系统 crontab 工具或 windows 计划任务程序的替换。注意apscheduler 不是一个守护进程或服务它自身不带有任何命令行工具。它主要是要在现有的应用程序中运行也就是说apscheduler 为我们提供了构建专用调度器或调度服务的基础模块。
安装pip install apschedule 调度器的工作流程 APScheduler 组件介绍 APScheduler 由5个部分组成触发器、调度器、任务存储器、执行器、任务事件。
任务 job任务id 和 任务执行 func触发器triggers确定任务何时开始执行。触发器包含调度逻辑描述一个任务何时被触发按日期或按时间间隔或按 cronjob 表达式三种方式触发。每个作业都有它自己的触发器除了初始配置之外触发器是完全无状态的。作业存储器job stores 也叫 任务存储器保存任务的状态。作业存储器指定了作业被存放的位置默认情况下作业保存在内存也可将作业保存在各种数据库中当作业被存放在数据库中时它会被序列化当被重新加载时会反序列化。作业存储器充当保存、加载、更新和查找作业的中间商。在调度器之间不能共享作业存储。执行器executors确定任务怎么执行。执行器是将指定的作业调用函数提交到线程池或进程池中运行当任务完成时执行器通知调度器触发相应的事件。调度器schedulers任务调度器属于控制角色通过它配置作业存储器、执行器和触发器添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行通常只有一个调度程序运行在应用程序中开发人员通常不需要直接处理作业存储器、执行器或触发器配置作业存储器和执行器是通过调度器来完成的调度器会自动完成。调度器串联任务的整个生命周期添加编辑任务到任务存储器,在任务的执行时间到来时把任务交给执行器执行返回结果同时发出事件监听监控任务事件 。任务事件 event监控任务执行异常情况触发器
触发器决定何时执行任务APScheduler 支持的触发器有3种 triggerinterval按固定时间周期执行支持weeksdayshoursminutes seconds 还可指定时间范围sched.add_job(job_function, interval, hours2, start_date2010-10-10 09:30:00, end_date2014-06-15 11:00:00) triggerdate固定时间执行一次。sched.add_job(my_job, date, run_datedatetime(2009, 11, 6, 16, 30, 5), args[text]) triggercron 支持 crontab 方式执行任务。也可指定时间范围。 year (int|str) – 4-digit year month (int|str) – month (1-12) day (int|str) – day of the (1-31) week (int|str) – ISO week (1-53) day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) hour (int|str) – hour (0-23) minute (int|str) – minute (0-59) second (int|str) – second (0-59) start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) end_date (datetime|str) – latest possible date/time to trigger on (inclusive) # 星期一到星期五5点30执行任务 job_function直到2014-05-30 00:00:00 sched.add_job(job_function, cron, day_of_weekmon-fri, hour5, minute30, end_date2014-05-30) # 按照crontab格式执行 格式为分钟 小时 天 月 周*表示所有 # 5月到8月的1号到15号0点0分执行任务job_function sched.add_job(job_function, CronTrigger.from_crontab(0 0 1-15 may-aug *))
触发器 的 参数
date 定时作业只执行一次。
run_date (datetime|str) – the date/time to run the job attimezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
sched.add_job(my_job, date, run_datedate(2009, 11, 6), args[text])
# 2019年7月6号16时30分5s执行
sched.add_job(my_job, date, run_datedatetime(2019, 7, 6, 16, 30, 5), args[text])
from datetime import datetime
from datetime import date
from apscheduler.schedulers.blocking import BlockingSchedulerdef job(text):print(text)scheduler BlockingScheduler()
# 在 2019-8-30 运行一次 job 方法
scheduler.add_job(job, date, run_datedate(2019, 8, 30), args[text1])
# 在 2019-8-30 01:00:00 运行一次 job 方法
scheduler.add_job(job, date, run_datedatetime(2019, 8, 30, 1, 0, 0), args[text2])
# 在 2019-8-30 01:00:01 运行一次 job 方法
scheduler.add_job(job, date, run_date2019-8-30 01:00:00, args[text3])scheduler.start()interval 间隔调度
weeks (int) – number of weeks to waitdays (int) – number of days to waithours (int) – number of hours to waitminutes (int) – number of minutes to waitseconds (int) – number of seconds to waitstart_date (datetime|str) – starting point for the interval calculationend_date (datetime|str) – latest possible date/time to trigger ontimezone (datetime.tzinfo|str) – time zone to use for the date/time calculations
scheduler.add_job(job_function, interval, hours2)import time
from apscheduler.schedulers.blocking import BlockingSchedulerdef job(text):t time.strftime(%Y-%m-%d %H:%M:%S, time.localtime(time.time()))print({} --- {}.format(text, t))scheduler BlockingScheduler()
# 每隔 1分钟 运行一次 job 方法
scheduler.add_job(job, interval, minutes1, args[job1])
# 在 2019-08-29 22:15:00至2019-08-29 22:17:00期间每隔1分30秒 运行一次 job 方法
scheduler.add_job(job, interval, minutes1, seconds30, start_date2019-08-29 22:15:00,end_date2019-08-29 22:17:00, args[job2])scheduler.start()
运行结果
job2 --- 2019-08-29 22:15:00
job1 --- 2019-08-29 22:15:46
job2 --- 2019-08-29 22:16:30
job1 --- 2019-08-29 22:16:46
job1 --- 2019-08-29 22:17:46
...余下省略...cron 调度
year (int|str) – 4-digit yearmonth (int|str) – month (1-12)day (int|str) – day of the (1-31)week (int|str) – ISO week (1-53)day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)hour (int|str) – hour (0-23)minute (int|str) – minute (0-59)second (int|str) – second (0-59)start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)end_date (datetime|str) – latest possible date/time to trigger on (inclusive)timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)
# 6-8,11-12月第三个周五 00:00, 01:00, 02:00, 03:00运行
scheduler.add_job(job_function, cron, month6-8,11-12, day3rd fri, hour0-3)
# 每周一到周五运行 直到2024-05-30 00:00:00
scheduler.add_job(job_function, cron, day_of_weekmon-fri, hour5, minute30, end_date2024-05-30
也可以用表达式类型可以用以下方式
表达式字段描述*任何在每个值都触发*/a任何每隔 a触发一次a-b任何在 a-b区间内任何一个时间触发 a必须小于 ba-b/c任何在 a-b区间内每隔 c触发一次xth yday第 x个星期 y触发lastxday最后一个星期 x触发lastday一个月中的最后一天触发x,y,z任何可以把上面的表达式进行组合
示例如下
from apscheduler.schedulers.blocking import BlockingSchedulerdef tick():passscheduler BlockingScheduler
scheduler.add_job(tick, cron, day4th sun, hour20, minute1)
scheduler.start()import time
from apscheduler.schedulers.blocking import BlockingSchedulerdef job(text):t time.strftime(%Y-%m-%d %H:%M:%S, time.localtime(time.time()))print({} --- {}.format(text, t))scheduler BlockingScheduler()
# 在每天22点每隔 1分钟 运行一次 job 方法
scheduler.add_job(job, cron, hour22, minute*/1, args[job1])
# 在每天22和23点的25分运行一次 job 方法
scheduler.add_job(job, cron, hour22-23, minute25, args[job2])
# 在每天 8 点运行一次 job 方法
scheduler.add_job(job, cron, hour8, args[job2])
# 在每天 8 点 20点各运行一次 job 方法 设置最大运行实例数
scheduler.add_job(job, cron, hour8, 20, minute30, max_instances4)scheduler.start()
运行结果
job1 --- 2019-08-29 22:25:00
job2 --- 2019-08-29 22:25:00
job1 --- 2019-08-29 22:26:00
job1 --- 2019-08-29 22:27:00
...余下省略...任务存储器
任务存储器决定任务的保存方式 默认存储在内存中MemoryJobStore重启后就没有了。不同的任务存储器可以在调度器的配置中进行配置见调度器。APScheduler 支持的任务存储器有 apscheduler.jobstores.memory内存 apscheduler.jobstores.mongodb存储在 mongodb apscheduler.jobstores.redis存储在 redis apscheduler.jobstores.rethinkdb存储在 rethinkdb apscheduler.jobstores.sqlalchemy支持 sqlalchemy 的数据库如 mysqlsqlite 等 apscheduler.jobstores.zookeeperzookeeper执行器
执行器决定如何执行任务常用的有 pool(线程/进程) 和 gevent(io多路复用支持高并发)默认为 pool 中线程池 不同的执行器可以在调度器的配置中进行配置见调度器。执行器的选择取决于应用场景。通常默认的 ThreadPoolExecutor 已经在大部分情况下是可以满足我们需求的。如果我们的任务涉及到一些 CPU密集计算的操作。那么应该考虑 ProcessPoolExecutor。然后针对每种程序 apscheduler也设置了不同的 executor apscheduler.executors.asyncio同步 io阻塞 apscheduler.executors.geventio 多路复用非阻塞 apscheduler.executors.pool: 线程ThreadPoolExecutor 和 进程ProcessPoolExecutor apscheduler.executors.twisted基于事件驱动调度器
调度器的主循环其实就是反复检查是不是有到时需要执行的任务分以下 2 步进行
询问自己的每一个作业存储器有没有到期需要执行的任务如果有计算这些作业中每个作业需要运行的时间点如果时间点有多个做 coalesce 检查。设置 coalesce 为 False 设置这个目的是比如由于某个原因导致某个任务积攒了很多次没有执行比如有一个任务是1分钟跑一次但是系统原因断了5分钟如果 coalesce True 那么下次恢复运行的时候会只执行一次而如果设置 coalesce False 那么就不会合并会5次全部执行。提交给执行器按时间点运行。在配置调度器前首先要选取合适应用环境场景的 调度器存储器 和 执行器。APScheduler 支持的调度器方式如下比较常用的为 BlockingScheduler 和 BackgroundScheduler下面是各调度器的适用场景可以满足绝大多数的应用环境
BlockingScheduler适用于调度程序是进程中唯一运行的进程调用start函数会阻塞当前线程不能立即返回。BackgroundScheduler适用于调度程序在应用程序的后台运行调用 start 后主线程不会阻塞。 AsyncIOScheduler适用于使用了 asyncio 模块的应用程序。 GeventScheduler适用于使用 gevent 模块的应用程序。 TwistedScheduler适用于构建 Twisted 的应用程序。 QtScheduler适用于构建 Qt 的应用程序。
调度器可以操作任务并为任务指定触发器、任务存储器和执行器和监控任务。
scheduler.add_job(job_func, triggerinterval, args[1], id1, namea test job, max_instances10, jobstoredefault, executordefault, seconds10) 作业存储器 的选择有两种
一 是内存也是默认的配置。适用场景重启整个应用程序后作业会被重新添加到调度器中此时简单的选取内存作为作业存储器即简单又高效。二 是 数据库。适用场景当调度器重启或应用程序崩溃时需要作业从中断位置恢复正常运行那么可以选择将作业存储在数据库中至于适用什么数据库可以自由选择PostgreSQL 是推荐的选择因为它具有强大的数据完整性保护。
执行器 的选择也取决于应用场景。
通常默认的 ThreadPoolExecutor 已经足够好。如果作业负载涉及CPU 密集型操作那么应该考虑使用 ProcessPoolExecutor甚至可以同时使用这两种执行器将 ProcessPoolExecutor 行器添加为二级执行器。调度器 的 配置
调度器配置在 add_job 我们看到 jobstore 和 executor 都是 defaultAPScheduler 在定义调度器时可以指定不同的任务存储和执行器以及初始的参数
from pytz import utc
import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor# 通过dict方式执行不同的 job_stores、executors和默认的参数
job_stores {mongo: MongoDBJobStore(),default: SQLAlchemyJobStore(urlsqlite:///jobs.sqlite)
}
executors {default: ThreadPoolExecutor(20),process_pool: ProcessPoolExecutor(5)
}
job_defaults {coalesce: False,max_instances: 3
}
# 定义调度器
scheduler BackgroundScheduler(jobstoresjob_stores, executorsexecutors,job_defaultsjob_defaults, timezoneutc
)def job_func(job_id):print(fjob {job_id} is run at {datetime.datetime.now().replace(microsecond0)})# 添加任务 scheduler.add_job(job_func, triggerinterval, args[1], id1, namea test job, jobstoredefault, executorprocess_pool, seconds10
)
# 启动调度器
scheduler.start()操作任务调度器可以增加删除暂停恢复和修改任务。需要注意的是这里的操作只是对未执行的任务起作用已经执行和正在执行的任务不受这些操作的影响。 add_job添加任务
scheduler.add_job(job_func, triggerinterval, args[1], id1, namea test job, max_instances10, jobstoredefault, executordefault, seconds10
)
remove_job通过任务唯一的 id删除的时候对应的任务存储器里记录也会删除
scheduler.add_job(myfunc, interval, minutes2, idmy_job_id)
scheduler.remove_job(my_job_id)
Pausing and resuming jobs暂停和重启任务
scheduler.add_job(myfunc, interval, minutes2, idmy_job_id)
scheduler.pause_job(my_job_id)
scheduler.resume_job(my_job_id)
Modifying jobs修改任务的配置
job scheduler.add_job(myfunc, interval, minutes2, idmy_job_id, max_instances10
)
# 修改任务的属性
job.modify(max_instances6, nameAlternate name)
# 修改任务的触发器
scheduler.reschedule_job(my_job_id, triggercron, minute*/5)
获取作业 (job) 列表。通过 get_jobs() 返回所有 job 实例或者 print_jobs() 输出格式化的作业列表
scheduler.get_jobs()
scheduler.print_jobs()
关闭调度器
默认情况下调度器会等待所有正在运行的作业完成后关闭所有的调度器和作业存储。如果你不想等待可以将wait选项设置为False。
scheduler.shutdown()
scheduler.shutdown(waitFalse)
监控任务事件类型比较常用的类型有 EVENT_JOB_ERROR: 表示任务在执行过程的出现异常触发 EVENT_JOB_EXECUTED任务执行成功时 EVENT_JOB_MAX_INSTANCES调度器上执行的任务超过配置的参数时
scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) apscheduler 提供了许多不同的方法来配置调度器。可以使用字典也可以使用关键字参数传递。首先实例化调度程序添加作业然后配置调度器获得最大的灵活性。
如果调度程序在应用程序的后台运行选择 BackgroundScheduler并使用默认的 jobstore 和默认的executor则以下配置即可
from apscheduler.schedulers.background import BackgroundSchedulerscheduler BackgroundScheduler()假如我们想配置更多信息设置两个执行器、两个作业存储器、调整新作业的默认值并设置不同的时区。下述三个方法是完全等同的。 配置需求
配置名为 “mongo” 的 MongoDBJobStore 作业存储器配置名为 “default” 的 SQLAlchemyJobStore (使用SQLite)配置名为 “default” 的 ThreadPoolExecutor最大线程数为20配置名为 “processpool” 的 ProcessPoolExecutor最大进程数为5UTC 作为调度器的时区coalesce 默认情况下关闭作业的默认最大运行实例限制为 3
方法 1
from pytz import utcfrom apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutorjob_stores {mongo: MongoDBJobStore(),default: SQLAlchemyJobStore(urlsqlite:///jobs.sqlite)
}
executors {default: ThreadPoolExecutor(20),process_pool: ProcessPoolExecutor(5)
}
job_defaults {coalesce: False,max_instances: 3
}
scheduler BackgroundScheduler(jobstoresjob_stores, executorsexecutors, job_defaultsjob_defaults, timezoneutc
)方法二
from apscheduler.schedulers.background import BackgroundSchedulerscheduler BackgroundScheduler({apscheduler.jobstores.mongo: {type: mongodb},apscheduler.jobstores.default: {type: sqlalchemy,url: sqlite:///jobs.sqlite},apscheduler.executors.default: {class: apscheduler.executors.pool:ThreadPoolExecutor,max_workers: 20},apscheduler.executors.processpool: {type: processpool,max_workers: 5},apscheduler.job_defaults.coalesce: false,apscheduler.job_defaults.max_instances: 3,apscheduler.timezone: UTC,
})方法三
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutorjob_stores {mongo: {type: mongodb},default: SQLAlchemyJobStore(urlsqlite:///jobs.sqlite)
}
executors {default: {type: threadpool, max_workers: 20},processpool: ProcessPoolExecutor(max_workers5)
}
job_defaults {coalesce: False,max_instances: 3
}
scheduler BackgroundScheduler()# .. do something else here, maybe add jobs etc.以上涵盖了大多数情况的调度器配置在实际运行时可以试试不同的配置会有怎样不同的效果。 启动 调度器
启动调度器前需要先添加作业有两种方法向调度器添加作业
一是通过接口 add_job()二是通过使用 函数装饰器其中 add_job() 返回一个 apscheduler.job.Job 类 的实例用于后续修改或删除作业。
我们可以随时在调度器上调度作业。如果在添加作业时调度器还没有启动那么任务将不会运行并且第一次运行时间在调度器启动时计算。
注意如果使用的是序列化作业的执行器或作业存储器那么要求被调用的作业函数必须是全局可访问的被调用的作业的参数是可序列化的作业存储器中只有 MemoryJobStore 不会序列化作业。执行器中只有 ProcessPoolExecutor 将序列化作业。
启用调度器只需要调用调度器的 start() 方法 使用 add_job 添加 job
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingSchedulerimport datetime
import time
import loggingdef job_function():print(Hello World str(datetime.datetime.now()))if __name__ __main__:log logging.getLogger(apscheduler.executors.default)log.setLevel(logging.INFO) # DEBUG fmt logging.Formatter(%(levelname)s:%(name)s:%(message)s)h logging.StreamHandler()h.setFormatter(fmt)log.addHandler(h)print(start to do it)sched BlockingScheduler()# Schedules job_function to be run on the third Friday # of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00 sched.add_job(job_function, cron, day_of_weekmon-fri, hour0-9, minute*, second*/4)sched.start() 使用 装饰器 添加 job
使用装饰器来展示一个调度的使用
from apscheduler.schedulers.blocking import BlockingSchedulerblocking_scheduler BlockingScheduler()blocking_scheduler.scheduled_job(interval, seconds3)
def timed_job():print(This job is run every three minutes.)blocking_scheduler.scheduled_job(cron, day_of_weekmon-fri, hour0-9, minute30-59, second*/3)
def scheduled_job():print(This job is run every weekday at 5pm.)print(before the start function)
blocking_scheduler.start()
print(let us figure out the situation) 调度器 的 事件监听
如果程序有异常抛出会影响整个调度任务吗请看下面的代码运行一下看看会发生什么情况
# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetimedef aps_test(x):print(1 / 0)print(datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)scheduler BlockingScheduler()
scheduler.add_job(funcaps_test, args(定时任务,), triggercron, second*/5)scheduler.start()每 5 秒抛出一次报错信息。任何代码都可能抛出异常关键是发生导常事件如何第一时间知道这才是我们最关心的apscheduler 已经为我们想到了这些提供了事件监听来解决这一问题。将上述代码稍做调整加入日志记录和事件监听如下所示。
import datetime
import logging
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERRORlogging.basicConfig(levellogging.INFO,format%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s,datefmt%Y-%m-%d %H:%M:%S,filenamelog1.txt,filemodea
)def aps_test(x):print(datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)def date_test(x):print(datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)print(1 / 0)def my_listener(event):if event.exception:print(任务出错了)else:print(任务照常运行...)scheduler BlockingScheduler()
scheduler.add_job(funcdate_test, args(一次性任务,会出错,),next_run_timedatetime.datetime.now() datetime.timedelta(seconds15), iddate_task
)
scheduler.add_job(funcaps_test, args(循环任务,), triggerinterval, seconds3, idinterval_task
)
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler._logger loggingscheduler.start()示例 示例 1 --- 间隔性任务
# -*- coding: utf-8 -*-from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingSchedulerdef task_func():print(Tick! The time is: %s % datetime.now())if __name__ __main__:scheduler BlockingScheduler()scheduler.add_job(task_func, interval, seconds3)print(Press Ctrl{0} to exit.format(Break if os.name nt else C ))try:scheduler.start()except (KeyboardInterrupt, SystemExit):pass说明
导入调度器模块 BlockingScheduler这是最简单的调度器调用 start 方阻塞当前进程如果你的程序只用于调度除了调度进程外没有其他后台进程那么请用 BlockingScheduler 非常有用此时调度进程相当于守护进程。定义一个函数 task_func 代表我们要调度的作业程序。实例化一个 BlockingScheduler 类不带参数表明使用默认的作业存储器-内存默认的执行器是线程池执行器最大并发线程数默认为 10 个另一个是进程池执行器。add_job 是添加一个作业 task_func触发器为 interval每隔 3 秒执行一次另外的触发器为 datecron。date 按特定时间点触发cron 则按固定的时间间隔触发。加入捕捉用户中断执行和解释器退出异常pass 关键字表示什么也不做。
程序执行结果是每 3 秒打印出了当前时间。 示例 2 --- cron 任务
# -*- coding: utf-8 -*-from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingSchedulerdef task_func():print(Tick! The time is: %s % datetime.now())if __name__ __main__:scheduler BlockingScheduler()scheduler.add_job(task_func, cron, hour19, minute23)print(Press Ctrl{0} to exit.format(Break if os.name nt else C ))try:scheduler.start()except (KeyboardInterrupt, SystemExit):pass定时 cron 任务也非常简单直接给触发器 trigger 传入 cron 即可。hour 19minute 23 这里表示每天的 19:23 分 执行任务。这里可以填写数字也可以填写字符串 示例设置后台任务
import time
import os
from datetime import datetime
from apscheduler.schedulers.background import BackgroundSchedulerdef tick():print(Tick! The time is: %s % datetime.now())# 非阻塞cron类型触发器
scheduler BackgroundScheduler()
scheduler.add_job(tick, cron, day_of_weekmon-fri, hour9, minute30)if __name__ __main__:# 运行任务调度scheduler.start()print(scheduler.get_jobs())print(Press Ctrl{0} to exit.format(Break if os.name nt else C))try:while True:time.sleep(5)print(sleep!)except (KeyboardInterrupt, SystemExit):# 关闭调度scheduler.shutdown(waitFalse)print(Exit The Job!)示例 设置任务监听 import datetime
from loguru import logger
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR# 任务执行函数
def job_func(job_id):print(job %s is runed at %s % (job_id, datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S)))# 事件监听
def job_exception_listener(event):if event.exception:# todo异常处理, 告警等 print(The job crashed :()else:print(The job worked :))# 日志 # 定义一个后台任务非阻塞调度器
scheduler BackgroundScheduler()
# 添加一个任务到内存中
# 触发器triggerinterval seconds10 每10s触发执行一次
# 执行器executordefault 线程执行
# 任务存储器jobstoredefault 默认内存存储
# 最大并发数max_instances
scheduler.add_job(job_func, triggerinterval, args[1], id1, namea test job, max_instances10, jobstoredefault, executordefault, seconds10
)
# 设置任务监听
scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
# 启动调度器
scheduler.start()示例使用默认的作业存储器
# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutordef my_job(idmy_job):print(id, --, datetime.datetime.now())job_stores {default: MemoryJobStore()}
executors {default: ThreadPoolExecutor(20),processpool: ProcessPoolExecutor(10)
}
job_defaults {coalesce: False,max_instances: 3
}
scheduler BlockingScheduler(jobstoresjob_stores, executorsexecutors, job_defaultsjob_defaults
)
scheduler.add_job(my_job, args[job_interval, ], idjob_interval, triggerinterval, seconds5, replace_existingTrue
)
scheduler.add_job(my_job, args[job_cron, ], idjob_cron, triggercron, month4-8,11-12, hour7-11, second*/10, end_date2021-05-30
)
scheduler.add_job(my_job, args[job_once_now, ], idjob_once_now)
scheduler.add_job(my_job, args[job_date_once, ], idjob_date_once, triggerdate, run_date2018-04-05 07:48:05
)
try:scheduler.start()
except SystemExit:print(exit)exit()上述代码使用内存作为作业存储器操作比较简单重启程序相当于第一次运行。 示例使用数据库作为存储器
# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStoredef my_job(idmy_job):print(id, --, datetime.datetime.now())job_stores {default: SQLAlchemyJobStore(urlsqlite:///jobs.sqlite)
}
executors {default: ThreadPoolExecutor(20),processpool: ProcessPoolExecutor(10)
}
job_defaults {coalesce: False,max_instances: 3
}scheduler BlockingScheduler(jobstoresjob_stores, executorsexecutors, job_defaultsjob_defaults
)
scheduler.add_job(my_job, args[job_interval, ], idjob_interval,triggerinterval, seconds5, replace_existingTrue
)
scheduler.add_job(my_job, args[job_cron, ], idjob_cron,triggercron, month4-8,11-12, hour7-11, second*/10,end_date2018-05-30
)
scheduler.add_job(my_job, args[job_once_now, ], idjob_once_now)
scheduler.add_job(my_job, args[job_date_once, ], idjob_date_once,triggerdate, run_date2018-04-05 07:48:05
)
try:scheduler.start()
except SystemExit:print(exit)exit()运行程序时提示有作业本应在 2018-04-05 07:48:05 运行的作业没有运行因为现在的时间为 2018-04-05 08:06:34错过了 0:18:28 的时间。 如果将过期 job 注释掉重新运行本程序则四种类型的作业仍会运行结果如下
Run time of job my_job (trigger: cron[month4-8,11-12, hour7-11, second*/10], next run at: 2018-04-05 08:14:40 CST) was missed by 0:00:23.680603
Run time of job my_job (trigger: cron[month4-8,11-12, hour7-11, second*/10], next run at: 2018-04-05 08:14:40 CST) was missed by 0:00:13.681604
Run time of job my_job (trigger: cron[month4-8,11-12, hour7-11, second*/10], next run at: 2018-04-05 08:14:40 CST) was missed by 0:00:03.681604
……
Run time of job my_job (trigger: interval[0:00:05], next run at: 2018-04-05 08:14:38 CST) was missed by 0:00:15.687917
Run time of job my_job (trigger: interval[0:00:05], next run at: 2018-04-05 08:14:38 CST) was missed by 0:00:10.687917
Run time of job my_job (trigger: interval[0:00:05], next run at: 2018-04-05 08:14:38 CST) was missed by 0:00:05.687917
job_interval -- 2018-04-05 08:14:33.821645
job_interval -- 2018-04-05 08:14:38.529167
job_cron -- 2018-04-05 08:14:40.150080
job_interval -- 2018-04-05 08:14:43.296188
job_interval -- 2018-04-05 08:14:48.327317作业仍会运行说明作业被添加到数据库中程序中断后重新运行时会自动从数据库读取作业信息而不需要重新再添加到调度器中如果不注释 21-25 行添加作业的代码则作业会重新添加到数据库中这样就有了两个同样的作业避免出现这种情况可以在 add_job 的参数中增加 replace_existingTrue如scheduler.add_job(my_job, args[job_interval,],idjob_interval,triggerinterval,seconds3,replace_existingTrue)
如果我们想运行错过运行的作业使用 misfire_grace_time如scheduler.add_job(my_job,args [job_cron,] ,idjob_cron,triggercron,month4-8,11-12,hour7-11,second*/15,coalesceTrue,misfire_grace_time30,replace_existingTrue,end_date2018-05-30)
说明misfire_grace_time假如一个作业本来 08:00 有一次执行但是由于某种原因没有被调度上现在 08:01 了这个 08:00 的运行实例被提交时会检查它预订运行的时间和当下时间的差值这里是1分钟大于我们设置的 30 秒限制那么这个运行实例不会被执行。最常见的情形是 scheduler 被 shutdown 后重启某个任务会积攒了好几次没执行如 5 次下次这个作业被提交给执行器时执行 5 次。设置 coalesceTrue 后只会执行一次。 其他操作如下
scheduler.remove_job(job_id,jobstoreNone)#删除作业
scheduler.remove_all_jobs(jobstoreNone)#删除所有作业
scheduler.pause_job(job_id,jobstoreNone)#暂停作业
scheduler.resume_job(job_id,jobstoreNone)#恢复作业
scheduler.modify_job(job_id, jobstoreNone, **changes)#修改单个作业属性信息
scheduler.reschedule_job(job_id, jobstoreNone, triggerNone,**trigger_args)#修改单个作业的触发器并更新下次运行时间
scheduler.print_jobs(jobstoreNone, outsys.stdout)#输出作业信息