做seo必须有自己网站吗,做ppt的兼职网站,开发公司产品部课件,怎么做属于自己的售卡网站首先声明一下 考虑到celery目前和asyncio的不兼容性#xff0c;协程任务需要转换为非异步的普通方法才能被当做task加入定时#xff0c;并且celery和asyncio使用可能会带来预想不到的问题#xff0c;在celery官方第二次承诺的6.0版本融合asyncio之前#xff0c;需要慎重考虑…首先声明一下 考虑到celery目前和asyncio的不兼容性协程任务需要转换为非异步的普通方法才能被当做task加入定时并且celery和asyncio使用可能会带来预想不到的问题在celery官方第二次承诺的6.0版本融合asyncio之前需要慎重考虑一下 如果你的项目是融合了asyncio的项目而且并不需要像celery文档中描述的那么多的复杂的定时功能一个轻量级的包APScheduler完全可以满足你的需求而且兼容asyncio框架
功能实现介绍
这是一个基于Sanic服务和Celery定时任务操作的功能实现的原理大致如下图
Server是我们的sanic服务负责接收和响应请求接收任务请求之后会异步非阻塞地将预警的定时任务交给celery处理Beat(Scheduler): 定期触发任务提前设置好的周期性或定时任务有可用worker时任务将会被执行这里我们的服务使用redis作为Beat SchedulerQueue: 接收的任务的队列使任务有序的进出是celery本身实现Worker: 执行任务Result Store(Result backend ): 存储任务的位置有需要时可召回任务的结果但是任务的结果会设置一个过期时间这里我们的服务使用redis作为Result Store
运行和使用的示例
sanic-celery server示例的目录结构
主要关注的内容在celery_app, query和第一层的sanic_server.py和结构settings.py保存的是项目的根目录
import os
import sysCELERY_BASE_DIR os.path.dirname(os.path.abspath(__file__))sys.path.insert(0, CELERY_BASE_DIR)celery
celery app启动
创建celery app并将celery app启动的配置信息加入配置信息在执行命令行启动celery之前加入都可以配置文件的内容可参考官方文档这里给出了简单示例的配置内容和说明注意4.x之后的celery配置变量要用小写的 # -*- coding:utf-8 -*-
from celery import Celeryfrom . import config
app Celery(app_name)
app.config_from_object(config)config.pybroker_url redis://localhost:6379/1
result_backend redis://localhost:6379/2
redbeat_redis_url redis://localhost:6379/3
redbeat_key_prefix roiq_redbeat_key_prefix:
# 任务运行结果过期时间默认一天传入秒数或者timedelta对象参考https://docs.celeryq.dev/en/stable/userguide/configuration.html#result-expires
result_expires 600task_serializer json
result_serializer json
accept_content [json]
timezone Asia/Shanghai
enable_utc True# (!)所有的tasks都要提前在这里imports
imports (query.tasks,send_email.tasks
)关于参数的更多详细说明可参考官方文档
Beat Scheduler是针对周期性任务和延时任务需求的非Django的celery默认不支持celery服务运行的时候修改任务状态的针对我们的业务需求我们需要在服务运行的时候增加、修改和查看任务因此引入了支持redis作为beat scheduler的模块redbeatredbeat的使用参考链接只需要使用其中的创建、更新和删除等常用操作方法
参考redbeat入门链接安装好redbeat之后以redbeat作为celery的beat启动celery不配置redbeat_redis_url时默认broker也是beat
celery启动命令
在windows环境下beat要和worker、broker分开启动
指定readbeat作为beat启动celery
在命令行执行celery -A celery_app beat -S redbeat.RedBeatScheduler -l debug --max-interval10
-A是celery app的位置这里celery_app的__init__.py中包含celery appbeat指定需要启动beat默认不启动-S指定beat的Scheduler对象-l是loglevel打印日志的信息等级支持info, debug等关键字–max-interval指定beat检查新修改的任务的间隔时间默认5分钟这里为了方便调试设置为10秒钟比较实时地看到结果
启动worker
在命令行执行celery -A worker -l debug -P gevent为了支持windows上运行需要先安装gevent(pip install gevent)在linux不需要-P选项
更多参数和详情可以用celery --helpcelery worker --help, celery beat --help查看
启动celery服务之后测试celery运行时的修改操作
redbeat在celery运行时修改任务的操作
使用redbeat支持在celery运行时修改任务的操作执行时确保celery的app、worker、beat服务和redis等存储服务都在运行
一个模拟的定时任务
query/tasks.py
# -*- coding:utf-8 -*-import asyncio
import timeimport pandas as pdfrom celery_app import appasync def countdown_task(a, b):以一个简单的方法代替sql查询的taskawait asyncio.sleep(1)for i in range(3):print(f-------{i}---------)time.sleep(1)return abapp.task
def sync_countdown_task(a, b):return asyncio.get_running_loop().run_until_complete(countdown_task(a, b))由于项目中使用的全都是异步协程方法需要将协程转换为普通的任务才能够注册为celery的task
sanic_server.py
# -*- coding:utf-8 -*-
import asyncio
from datetime import timedeltafrom celery.schedules import crontab, schedule
from redbeat import RedBeatSchedulerEntry
from sanic import Sanic
from sanic import responsefrom celery_app import app as celery_app
from celery_app.config import redbeat_key_prefix
from query.tasks import sync_countdown_tasksanic_app Sanic(sanic_celery)loop asyncio.get_event_loop()# 开始定时任务需要在不重启celery服务的情况下将任务添加到beat
async def query_task_create(request):通过此api创建周期性的查询任务tasks fquery.tasks # 任务所在的模块具体到.py文件sche schedule(timedelta(seconds5))task_name sync_countdown_task.__name__task f{tasks}.{task_name}entry RedBeatSchedulerEntry(task_name, task, sche, args(1, 2), appcelery_app)print(entry)key entry.key # key存到数据库...entry.save() return response.text(fschedule2 created..., task key is: {key})async def schedule_disable(request):task_name sync_countdown_task.__name__key redbeat_key_prefix task_name # key 可以entry RedBeatSchedulerEntry.from_key(key, celery_app)entry.enabled Falseentry.save()print(entry)return response.text(schedule disabled..)async def schedule_enable(request):task_name sync_countdown_task.__name__key redbeat_key_prefix task_nameentry RedBeatSchedulerEntry.from_key(key, celery_app)entry.enabled Trueentry.save()print(entry)return response.text(schedule enabled..)async def schedule_delete(request):task_name sync_countdown_task.__name__ # 请求时获得最开始也是用数据库存储和获取task_key f{redbeat_key_prefix}{sync_countdown_task.__name__}entry RedBeatSchedulerEntry.from_key(task_key, appcelery_app)print(entry)entry.delete()print(删除后的entry: , entry)return response.text(task_name deleted)async def schedule_update(request):task_name sync_countdown_task.__name__ # 请求时获得最开始也是用数据库存储和获取task_key f{redbeat_key_prefix}{sync_countdown_task.__name__}# 获取task keyentry RedBeatSchedulerEntry.from_key(task_key, appcelery_app) # (!)要考虑任务已经删除key不存在的情况print(entry)# 修改scheduleentry.schedule schedule(timedelta(seconds3))# 修改参数entry.args (3, 4)entry.save()print(entry)return response.text(task_name updated)async def schedule_info(request):task_key f{redbeat_key_prefix}{sync_countdown_task.__name__}entry RedBeatSchedulerEntry.from_key(task_key, appcelery_app)return response.text(f{entry})sanic_app.add_route(query_task_create, /create2)
sanic_app.add_route(schedule_update, /update)
sanic_app.add_route(schedule_delete, /delete)
sanic_app.add_route(schedule_disable, /disable)
sanic_app.add_route(schedule_enable, /enable)
sanic_app.add_route(schedule_info, /info)if __name__ __main__:sanic_app.run(port4321)注更新和删除等操作的key/task_key的获取在上线时需要从数据库中存储和获取
设置定时任务的运作流程
设定celery配置存放于config.py中也可以用其他方式存储创建app导入配置的内容编写好task和server调用的apicelery -A celery_app beat -S redbeat.RedBeatScheduler -l debug --max-interval10类似的命令运行beatcelery -A celery_app worker -l debug -P gevent -E类似的命令运行worker运行sanic服务根据api传入的参数使用redbeat.RedBeatSchedulerEntry创建定时任务使用RedBeatSchedulerEntry.from_key()获取并修改定时任务根据api用户和产品返回已设定的定时任务列表供用户查看和操作