黄山网站建设,一个网站的建站流程,网络培训远程教育平台,成都建设网站的公司有哪些Celery的实践指南celery原理#xff1a;celery实际上是实现了一个典型的生产者-消费者模型的消息处理/任务调度统#xff0c;消费者(worker)和生产者(client)都可以有任意个#xff0c;他们通过消息系统#xff08;broker#xff09;来通信。典型的场景为#xff1a;客户… Celery的实践指南 celery原理 celery实际上是实现了一个典型的生产者-消费者模型的消息处理/任务调度统消费者(worker)和生产者(client)都可以有任意个他们通过消息系统broker来通信。 典型的场景为 客户端启动一个进程生产者当用户的某些操作耗时较长或者比较频繁时考虑接入本消息系统发送一个task任务给broker。后台启动一个worker进程消费者当发现broker中保存有某个任务到了该执行的时间他就会拿过来根据task类型和参数执行。 实践中的典型场景 简单的定时任务替换crontab的celery写法from celery import Celeryfrom celery.schedules import crontabapp Celery(tasks, backendredis://localhost, brokerredis://localhost)app.conf.update(CELERYBEAT_SCHEDULE { add: { task: celery_demo.add, schedule: crontab(minute*), args: (16, 16) },})app.taskdef add(x, y): return x y运行celery的worker让他作为consumer运行自动从broker上获得任务并执行。celery -A celery_demo worker运行celery的client让其根据schedule自动生产出task msg并发布到broker上。celery -A celery_demo beat安装并运行flower方便监控task的运行状态celery flower -A celery_demo或者设置登录密码 celery flower -A celery_demo --basic_authuser1:password1,user2:password2 多同步任务-链式任务-失败自动重试的task失败重试方法 将task代码函数参数增加self同时绑定bind。demo代码 app.task(bindTrue, default_retry_delay300, max_retries5)def my_task_A(self): try: print(doing stuff here...) except SomeNetworkException as e: print(maybe do some clenup here....) self.retry(e) 自动重试后是否将任务重新入queue后排队还是等待指定的时间可以通过self.retry()参数来指定。派发到不同Queue队列的task一个task自动映射到多个queue中的方法, 通过配置task和queue的routing_key命名模式。比如把queue的exchange和routing_key配置成通用模式再定义task的routing_key的名称可用的不同exchange策略direct直接根据定义routing_keytopicexchange会根据通配符来将一个消息推送到多个queue。fanout将消息拆分分别推送到不同queue通常用于超大任务耗时任务。参考http://celery.readthedocs.org/en/latest/userguide/routing.html#routers高级配置result是否保存失败邮件通知关闭rate limit:auto_reload方法*nix系统celery通过监控源代码目录的改动自动地进行reload使用方法1.依赖inotifyLinux 2. kqueueOS X / BSD安装依赖 $ pip install pyinotify (可选) 指定fsNotify的依赖 $ env CELERYD_FSNOTIFYstat celery worker -l info --autoreload 启动 celery -A appname worker --autoreloadauto-scale方法启用auto-scale临时增加worker进程数量增加consumer $ celery -A proj control add_consumer foo -d worker1.local 临时减少worker进程数量减少consumer 将scheduled task的配置从app.conf变成DB的方法需要在启动时指定custom schedule 类名比如默认的是 celery.beat.PersistentScheduler 。 celery -A proj beat -S djcelery.schedulers.DatabaseScheduler 启动停止worker的方法启动 as daemon : http://docs.celeryproject.org/en/latest/tutorials/daemonizing.html#daemonizingroot用户可以使用celeryd非特权用户celery multi start worker1 -A appName —autoreload --pidfile$HOME/run/celery/%n.pid --logfile$HOME/log/celery/%n.log或者 celery worker —detach停止 ps auxww | grep celery worker | awk {print $2} | xargs kill -9 与Flask集成的方法集成后flask将充当producer来创建并发送task给broker在celery启动的独立worker进程将从broker中获得task并执行同时将结果返回。flask中异步地获得task结果的方法add.delay(x,y),有时需要对参数进行命名后传递 或者 add.apply_async(args(x,y), countdown30)flask获得与flask集成后的启动问题由于celery的默认routing_key是根据生产者在代码中的import级别来设定的所以worker端在启动时应该注意其启动目录应该在项目顶级目录上否者会出现KeyError。性能提升 eventlet 和 greenlet 官方参考http://docs.celeryproject.org/en/latest/userguide/index.html 转载于:https://www.cnblogs.com/ToDoToTry/p/5453149.html