百度网站介绍显示图片,装修公司一般多少钱一平方,微网站建设讯息,搜索引擎优化的主要工作celery介绍 # celery 的概念#xff1a; * 翻译过来是芹菜 * 官网#xff1a;https://docs.celeryq.dev/en/stable/ # 是分布式的异步任务框架#xff1a; 分布式#xff1a;一个任务#xff0c;拆成多个任务在不同机器上做 异步任务#xff1a;后台执行…celery介绍 # celery 的概念 * 翻译过来是芹菜 * 官网https://docs.celeryq.dev/en/stable/ # 是分布式的异步任务框架 分布式一个任务拆成多个任务在不同机器上做 异步任务后台执行不阻塞主任务 框架集成到项目中 # 作用 1、异步任务异步()发邮件短信通知 2、延迟任务延迟几秒 再执行某个任务 订单提交后延迟半小时把订单取消 3、定时任务 每隔多长事件 执行某个任务比如定时更新缓存 eg.买了个会员会员快到期了会每天给你发短信提醒 celery架构 # django 是一个服务celery 也是是一个服务和django没有必然联系 -命令启动就能提供服务# 三个模块 1、broker消息中间件消息队列任务中间件 存储任务函数发送短信任务统计在线人数... redis: 使用字符串形式能把任务表示出来即可 reabbitmq 存储: 其实就是一个队列一个个任务任务 2、worker任务执行单元可以启动多个 从消息队列broker的redis取出任务然后去执行程序(进程) 3、backend结果存储 Result Stores 任务执行完成后的结果存储在这里 redis存储关系型数据库。。# 执行流程 1、其他程序提交任务(函数)任务序列化后存到celery的broker中 用redis 0 库 2、接下来worker执行从broker中取任务然后执行 3、任务执行完后把结果存到 bancked中 用redis 1库 # 注意 celery和其他程序是 独立运行的 1、可以不依赖任何服务器通过自身命令启动服务(内部支持socket) 2、celery服务为为其他项目服务提供异步解决任务需求的 注会有两个服务同时运行一个是项目服务django一个是celery服务 项目服务将需要异步处理的任务交给celery服务celery就会在需要时异步完成项目的需求 # 例子 人django是一个独立运行的服务 | 医院(celery)也是一个独立运行的服务 正常情况下人可以完成所有健康情况的动作不需要医院的参与但当人生病时就会被医院接收解决人生病问题 人生病的处理方案交给医院来解决所有人不生病时医院独立运行人生病时医院就来解决人生病的需求 celery快速使用 # 使用步骤 1、安装pip install celery 2、写个py文件 demo.py from celery import Celery
import time
broker redis://127.0.0.1:6379/1
backend redis://127.0.0.1:6379/2
app Celery(app, brokerbroker, backendbackend)
# 写任务---》写函数---》必须用 app.task 装饰---》装饰后就变成了celery的任务了
app.task
def hello():time.sleep(2) # 模拟任务延迟return hello world
app.task
def add(a, b):time.sleep(3)return a b 3、其他程序中提交任务 resadd.delay(4,5)
print(res) 4、 启动worker---worker启动可以再靠前 win运行 pip3 install eventlet celery -A demo worker -l info -P eventlet 非win运行mac linux celery -A demo worker -l info 5、查询结果 ---直接取redis中查 ---使用代码查询 from demo import app
from celery.result import AsyncResult
id 17bf03ad-a1e6-49d1-a182-794bd3e96b74
if __name__ __main__:a AsyncResult(idid, appapp)if a.successful():result a.get() # hello worldprint(result)elif a.failed():print(任务失败)elif a.status PENDING:print(任务等待中被执行)elif a.status RETRY:print(任务异常后正在重试)elif a.status STARTED:print(任务已经开始被执行) celery包结构 # 包结构 目录如下: --celery_task --celery.py --user_task.py --order_task.py --goods_task.py -其他程序中提交任务: add_task_package.py -其他程序中查询结果get_result_package.py ###具体步骤 # celery.py
from celery import Celery
#######1 实例化得到对象 ##########
broker redis://127.0.0.1:6379/1
backend redis://127.0.0.1:6379/2
app Celery(app, brokerbroker, backendbackend,include[celery_task.order_task,celery_task.user_task]) #######2 写任务 ##########以后各种类型任务单独写在py文件中
# order_task.py user_task.py
# order_task.py
import time
from .celery import app
app.task
def cancel_order(order_id):time.sleep(2)return 订单%s取消成功 % order_id # user_task.py
import time
from .celery import app
app.task
def send_sms(phone, code):time.sleep(1)return 手机号%s发送验证码%s成功 % (phone, code) ######## 3 其他程序提交任务 ##############################
from celery_task.user_task import send_sms
ressend_sms.delay(1893424323,8888)
print(res) ##### 4 启动worker########## 在包的一层执行包不需要具体到某个py文件了 win运行pip3 install eventlet A celery_demo 包名----因为他会去包下找 celery.py 中得app执行 celery -A celery_task worker -l info -P eventlet 非win运行mac linux celery -A celery_task worker -l info #####5 查询结果#####
# 使用代码查询结果
from celery_task.celery import app
from celery.result import AsyncResultid 46b26c73-62ae-403c-ba62-e469f2f8c69f
if __name__ __main__:a AsyncResult(idid, appapp)if a.successful():result a.get() # hello worldprint(result)elif a.failed():print(任务失败)elif a.status PENDING:print(任务等待中被执行)elif a.status RETRY:print(任务异常后正在重试)elif a.status STARTED:print(任务已经开始被执行) celery实现异步任务定时任务延迟任务 # 异步任务: 任务名.delay(传参数) # 延迟任务---延迟多长事件干事 点右键执行work运行 # 链接上面任务二的order函数
from celery_task.order_task import cancel_order
from datetime import datetime, timedelta# atetime.utcnow() 当前utc时间
eta datetime.utcnow() timedelta(seconds15)
res cancel_order.apply_async(args[10001,], etaeta) # 订单15s 后执行这个任务
print(res) # 定时任务--一定要启动beat 1、 在celery.py 中写 # 链接上面任务二的user_task函数
# 时区
app.conf.timezone Asia/Shanghai
# 不使用UTC时间
app.conf.enable_utc False
# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule {send_sms: {task: celery_task.user_task.send_sms, # 执行的任务函数 schedule: timedelta(seconds3), # 每隔三秒钟干# schedule: crontab(hour8, day_of_week1), # 每周一早八点args: (1896388888, 6666), # 传参数手机号\验证码}
} 2、启动workercelery -A celery_task worker -l info -P eventlet 3、启动beat(每个一段时间就提交任务)celery -A celery_task beat -l info 4、等待即可 django中使用celery # 两种方案 -通用方案自己封装 -django-celery--》app---》创建出一些表 # 自己封装的通用方案: 1、把封装的包celery_task 复制到项目中 2、在django中使用celery.py 中必须加入 os.environ.setdefault(DJANGO_SETTINGS_MODULE, luffy_api.settings.dev) 3 写任务启动worker 4 在django的视图类中异步调用即可 # celery_task/celery.py
from celery import Celery
import os# 任务里使用django的东西缓存表模型。。。必须加入
os.environ.setdefault(DJANGO_SETTINGS_MODULE, luffy_api.settings.dev)# 1 实例化得到对象
broker redis://127.0.0.1:6379/0
backend redis://127.0.0.1:6379/1
app Celery(app, brokerbroker, backendbackend, include[celery_task.order_task, celery_task.user_task])# 2 写任务以后各种类型任务单独写在py文件中# 定时任务
app.conf.timezone Asia/Shanghai # 时区
app.conf.enable_utc False # 不使用UTC# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontabapp.conf.beat_schedule {
}# celery_task/user_task.py
import time
from .celery import app
from libs.send_information import common_send_smsapp.task
def send_sms(phone, code):res common_send_sms(code, mobilephone)if res:return 短信发送成功%s % phoneelse:return 短信发送失败%s % phone # user/views.py# 发送短信接口action(methods[get], detailFalse, url_pathsend_information)def send_sms(self, request):try:mobile request.query_params[mobile] # 取的手机号放在请求地址栏中code get_code() # 生成验证码cache.set(sms_code_%s % mobile, code, 61) # 放在缓存中以手机号做区# 异步发送短信--不管是否成功--如果不成功用户再发一次即可# t Thread(targetcommon_send_sms, args[code, mobile])# t.start() # 启动线程发送短信# celery异步发送短信send_sms.delay(mobile, code)return APIResponse(msg短信已发送)except MultiValueDictKeyError as e:raise APIException(detail手机号必须携带)except Exception as e:raise APIException(detailstr(e)) 双写一致性(缓存问题) # 轮播图加缓存 出现问题banner表中数据变了缓存不会变 mysql和redis数据不一致 mysql和redis双写一致性 # 双写一致性的解决方案 1、mysql修改---删缓存 2、mysql修改---改缓存 3、定时更新---每个5s更新一次缓存 先删缓存在更新mysql 先改缓存再更新mysql # 轮播图的接口---使用定时更新解决双写一致性问题 # celery_task/home_task.p
import time
from .celery import app
from home.models import Banner
from django.conf import settings
from home.serializer import BannerSerializer
from django.core.cache import cacheapp.task
def update_banner():# 1 获取所有轮播图数据queryset Banner.objects.all().filter(is_deleteFalse, is_showTrue).order_by(orders)[:settings.BANNER_COUNT]# 2 序列化ser BannerSerializer(instancequeryset, manyTrue)# ser.data# 2.1 把服务端前缀拼接上for item in ser.data:# media/banner/banner1.png,item[image] settings.BACKEND_URL item[image]# 3 放到缓存cache.set(banner_list, ser.data)return 更新成功 # celery.py
app Celery(app, brokerbroker, backendbackend, include[celery_task.order_task, celery_task.user_task,celery_task.home_task])app.conf.beat_schedule {update_banner: {task: celery_task.home_task.update_banner,# schedule: timedelta(seconds5),schedule: timedelta(minutes3),args: (),},
} # 启动workercelery -A celery_task worker -l info -P eventlet 启动beatcelery -A celery_task beat -l info # 以后尽管改 mysql数据最多3分钟就会更新到最新了 异步秒杀方案 # 秒杀功能 并发量要高承载住很多用户同时操作 订单表 扣减库存 效率要高 # 同步秒杀 假设秒杀需要10s钟项目并发量是3总共5个商品要秒杀 10s内只有3个人能进入到系统并且开始秒杀 # 前端
const routes [{path: /,name: home,component: HomeView},{path: /seckill,name: seckill,component: SeckillView},
] templatedivHeader/Headerdiv stylepadding: 50px;margin-left: 100pxh1Go语言课程/h1img srchttp://photo.liuqingzheng.top/2023%2002%2022%2021%2057%2011%20/image-20230222215707795.pngheight300pxwidth300pxbrel-button typedanger clickhandleSeckill v-loading.fullscreen.lockfullscreenLoading秒杀课程/el-button/divbrbrbrbrbrbrbrbrbrbrbrbrbrbrbrbrbrbrbrbrbrbrbrbrbrbrbrbrbrbrFooter/Footer/div
/templatescript
import Header from /components/Header
import Footer from /components/Footerexport default {name: SckillView,data() {return {fullscreenLoading: false,task_id: ,t: null,}},methods: {// ##############同步秒杀##############// handleSeckill() {// this.fullscreenLoading true;// this.$axios({// url: /user/seckill/seckill/,// method: POST,// data: {// course_id: 99// }// }).then(res {// this.fullscreenLoading false;// this.$message.success(res.msg)// }).catch(res {// this.fullscreenLoading false;// this.$message.error(res)// })// }// ##############同步秒杀##############// ##############异步秒杀##############handleSeckill() {this.fullscreenLoading true;this.$axios({url: /user/seckill/seckill/,method: POST,data: {course_id: 99}}).then(res {// 在排队转圈的还需要继续显示this.$message.success(res.msg)this.task_id res.task_id// 继续发送请求---》查询是否秒杀成功1 成功 2 没成功 3 秒杀任务还没执行// 启动定时任务没隔1s向后端发送一次请求this.t setInterval(() {this.$axios({url: /user/seckill/get_result/,method: get,params: {task_id: this.task_id}}).then(res {// 100 成功,success : 1 成功 0 失败 2 还没开始if (res.success 1) {// 转圈框不显示this.fullscreenLoading false;// 停止定时任务clearInterval(this.t)this.t nullthis.$message.success(res.msg)} else if (res.success 0) {// 转圈框不显示this.fullscreenLoading false;// 停止定时任务clearInterval(this.t)this.t nullthis.$message.error(res.msg)} else {// this.$message.error(res.msg)console.log(res.msg)}})}, 1000)}).catch(res {this.fullscreenLoading false;this.$message.error(res)})}},components: {Header, Footer}
}
/scriptstyle scoped/style # 后端
# 秒杀功能
import randomfrom celery_task.order_task import seckill
from celery_task.celery import app
from celery.result import AsyncResultclass SeckillView(ViewSet):# 同步操作性能不高# 异步提交任务action(methods[POST], detailFalse)def seckill(self, request, *args, **kwargs):course_id request.data.get(course_id)task_id seckill.delay(course_id)return APIResponse(msg您正在排队, task_idstr(task_id))action(methods[GET], detailFalse)def get_result(self, request, *args, **kwargs):task_id request.query_params.get(task_id)a AsyncResult(idtask_id, appapp)if a.successful():result a.get() # True 和 Falseif result:return APIResponse(success1, msg秒杀成功)else:return APIResponse(success0, msg秒杀失败)elif a.status PENDING:print(任务等待中被执行)return APIResponse(success2, msg任务等待中被执行)else:return APIResponse(success3, msg秒杀任务正在执行) # 任务
import random
import timeapp.task
def seckill(course_id):print(根据课程id%s查询课程是否还有剩余耗时2s % course_id)time.sleep(2)res random.choice([True, False])if res: # 库存够print(扣减库存耗时1s)time.sleep(1)print(下单耗时2s)time.sleep(2)return Trueelse:return False# 路由
router.register(seckill, SeckillView, seckill) 今日思维导图