提供温州手机网站制作哪家好,建工论坛网,wordpress 小说模版,商派商城网站建设二次开发参考博客#xff1a;https://www.cnblogs.com/pyedu/p/12461819.html
参考视频#xff1a;01 celery的工作机制_哔哩哔哩_bilibili 定义#xff1a;简单灵活、处理大量消息的分布式系统#xff0c;专注于实时处理异步队列#xff0c;支持任务调度 主要架构#xff1a;
…参考博客https://www.cnblogs.com/pyedu/p/12461819.html
参考视频01 celery的工作机制_哔哩哔哩_bilibili 定义简单灵活、处理大量消息的分布式系统专注于实时处理异步队列支持任务调度 主要架构
消息中间件message broker 可以集成第三方消息中间件如Redis、RabbitMQ任务执行单元worker 是celery提供的执行的任务执行的单元并发分布在分布式的系统节点中任务执行结果存储task result store来存储执行任务的结果支持方式 redis、AMQP
同步请求 顺序进行IO操作等待阻塞进程依次执行
异步请求异步进行当IO操作阻塞时放到执行单元中完成放到数据库中而不影响其他单元的执行当主进程需要阻塞的进程结果时会向是数据库中取出该数据(即将耗时操作放到异步队列中不影响主进程的执行)继续向下进行
使用场景
异步任务将耗时操作任务提交到celery异步执行如发送短信、消息推送、音视频处理定时任务定时执行某件事情如每日数据统计
主要优点
简单使用和维护不要配置文件只需添加基本信息的配置高可用在work和client网络连接丢失或失败时会自动进行重试快速单个celery进程可每分钟处理百万级任务只需要毫秒级的往返延迟灵活可以扩展使用自定义池的实现、序列化、日志记录、消费者、broker消息传输
安装
pip install celery
实践案例 异步任务执行文件celery_task.py
消费者模型import celery
import time
# task.py
import osos.environ.setdefault(FORKED_BY_MULTIPROCESSING, 1)backendredis://127.0.0.1:6379/1
brokerredis://127.0.0.1:6379/2
celcelery.Celery(test,backendbackend,brokerbroker)
cel.task
def send_email(name):print(向%s发送邮件...%name)time.sleep(5)print(向%s发送邮件完成%name)return okcel.task
def send_msg(name):print(向%s发送短信...%name)time.sleep(5)print(向%s发送短信完成%name)return ok
执行任务文件: produce_task.py
生成者模型from celery_task import send_email,send_msg
result send_email.delay(yuan) # 当执行delay函数时会自动调用消息中间件的任务执行队列放到任务执行单元中
print(result.id)
result send_msg.delay(alex)
print(result.id)
先启动redis进程 使用特定命令下发指令执行celery任务
(注意celery5.0之前的命令是不一样的celery worker -A celery_task -l info) 先执行produce_task.py
返回ID: fd27bc20-ccac-4855-9b3d-150708bad2a6 c07cb5b1-845a-44c4-963b-7ce3f92b98c8 检查celery的异步队列查看执行结果 注当遇到以下情况 The above exception was the direct cause of the following exception: Traceback (most recent call last): File D:\python3\lib\site-packages\billiard\pool.py, line 361, in workloop result (True, prepare_result(fun(*args, **kwargs))) File D:\python3\lib\site-packages\celery\app\trace.py, line 664, in fast_trace_task tasks, accept, hostname _loc ValueError: not enough values to unpack (expected 3, got 0) [2024-02-24 15:31:20,394: ERROR/MainProcess] Task handler raised error: ValueError(not enough values to unpack (expected 3, got 0)) 解决方法 在消费者模型中添加以下代码 import os
os.environ.setdefault(FORKED_BY_MULTIPROCESSING, 1)查看异步执行的结果 查看任务执行结果: result.pyfrom celery.result import AsyncResult
from celery_task import celasync_resultAsyncResult(idfd27bc20-ccac-4855-9b3d-150708bad2a6, appcel)if async_result.successful():result async_result.get()print(result)# result.forget() # 将结果删除
elif async_result.failed():print(执行失败)
elif async_result.status PENDING:print(任务等待中被执行)
elif async_result.status RETRY:print(任务异常后正在重试)
elif async_result.status STARTED:print(任务已经开始被执行)# 运行结果是上面执行返回的结果
ok
celery多任务结构下异步执行注意celery_tasks的celery名字是固定不然会报错 # celery
from celery import Celerycel Celery(celery_demo,brokerredis://127.0.0.1:6379/1,backendredis://127.0.0.1:6379/2,# 包含以下两个任务文件去相应的py文件中找任务对多个任务做分类include[celery_tasks.task01,celery_tasks.task02])# 时区
cel.conf.timezone Asia/Shanghai
# 是否使用UTC
cel.conf.enable_utc False# task01
import time
from .celery import celcel.task
def send_email(res):time.sleep(5)return 完成向%s发送邮件任务%res# task02
import time
from .celery import cel
cel.task
def send_msg(name):time.sleep(5)return 完成向%s发送短信任务%name#
执行任务文件: produce_task.py 和上面的celery_task保持在同一级目录
生成者模型from celery_tasks.task01 import send_email
from celery_tasks.task02 import send_msg# 立即告知celery去执行test_celery任务并传入一个参数
result send_email.delay(yuan)
print(result.id)
result send_msg.delay(yuan)
print(result.id)
E:\desktop\my_drf\celeryprocelery -A celery_tasks worker -l info -P eventlet
运行结果 定时任务的配置
# 更新produce_task 文件增加定时任务
from celery_task import send_email
from datetime import datetime# 方式一
# v1 datetime(2020, 3, 11, 16, 19, 00)
# print(v1)
# v2 datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result send_email.apply_async(args[egon,], etav2) # 定时任务
# print(result.id)# 方式二
ctime datetime.now()
# 默认用utc时间
utc_ctime datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay timedelta(seconds10) # 当时时间10s后执行任务
task_time utc_ctime time_delay# 使用apply_async并设定时间
result send_email.apply_async(args[egon], etatask_time)
print(result.id)# 更新setting
cel.conf.beat_schedule {# 名字随意命名add-every-10-seconds: {# 执行tasks1下的test_celery函数task: celery_tasks.task01.send_email,# 每隔2秒执行一次# schedule: 1.0,# schedule: crontab(minute*/1),schedule: timedelta(seconds6),# 传递参数args: (张三,)},# add-every-12-seconds: {# task: celery_tasks.task01.send_email,# 每年4月11号8点42分执行# schedule: crontab(minute42, hour8, day_of_month11, month_of_year4),# args: (张三,)# },
}
运行结果 根据上述配置每6s执行task01发送邮件任务
注意
# 周期性执行任务单元要注意先启动beat进程而后执行worker单元
E:\desktop\my_drf\celeryprocelery -A celery_tasks beat
E:\desktop\my_drf\celeryprocelery -A celery_tasks worker -l info -P eventlet注意 当打开beat后而若没有打开worker执行单元会导致beat进程不断向数据库中加入数据 查看redis堆积的数据方法cmd命令如下 python脚本实现 celery结合django中集成的运用 # tasks
# celery的任务必须写在tasks.py的文件中别的文件名称不识别!!!
from mycelery.main import app
import timeimport logging
log logging.getLogger(django)app.task # name表示设置任务的名称如果不填写则默认使用函数名做为任务名
def send_sms(mobile):发送短信print(向手机号%s发送短信成功!%mobile)time.sleep(5)return send_sms OKapp.task # name表示设置任务的名称如果不填写则默认使用函数名做为任务名
def send_sms2(mobile):print(向手机号%s发送短信成功! % mobile)time.sleep(5)return send_sms2 OK# config
broker_url redis://127.0.0.1:6379/15
result_backend redis://127.0.0.1:6379/14# main
# 主程序
import os
from celery import Celery
# 创建celery实例对象
app Celery(sms)
# import os
os.environ.setdefault(FORKED_BY_MULTIPROCESSING, 1) # 注意 默认配置要这样配置下列的配置会找不到组件导致失败
# 把celery和django进行组合识别和加载django的配置文件
# os.environ.setdefault(DJANGO_SETTINGS_MODULE, celerypro.settings.dev)
# os.environ.setdefault(DJANGO_SETTINGS_MODULE, config.settings.local)# 通过app对象加载配置
app.config_from_object(mycelery.config)# 加载任务
# 参数必须必须是一个列表里面的每一个任务都是任务的路径名称
# app.autodiscover_tasks([任务1,任务2])
app.autodiscover_tasks([mycelery.sms,])# view
from django.shortcuts import render,HttpResponse
from mycelery.sms.tasks import send_sms,send_sms2
from datetime import timedeltafrom datetime import datetime
def test(request):################################# 异步任务# 1. 声明一个和celery一模一样的任务函数但是我们可以导包来解决send_sms.delay(110)send_sms2.delay(119)# send_sms.delay() # 如果调用的任务函数没有参数则不需要填写任何内容################################# 定时任务ctime datetime.now()# 默认用utc时间utc_ctime datetime.utcfromtimestamp(ctime.timestamp())time_delay timedelta(seconds3) # 3s 发送消息task_time utc_ctime time_delayresult send_sms.apply_async([911, ], etatask_time)print(result.id)return HttpResponse(ok)
启动Celery的命令
# 强烈建议切换目录到mycelery根目录下启动
# E:\desktop\my_drf\celeryprocelery -A mycelery.main worker --loglevelinfo
运行结果