Django-celery-beat动态添加周期性任务实现过程解析
作者:-零 发布时间:2021-07-29 13:09:49
前期准备
1.beat插件安装
pip3 install django-celery-beat
2.注册APP
INSTALLED_APPS = [
....
'django_celery_beat',
]
3.数据库变更
python3 manage.py migrate django_celery_beat
配置工作
目录结构请参考://www.jb51.net/article/200659.htm
1.配置celerypro.py
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
from django.utils import timezone
# set the default Django settings module for the 'celery' program.
# 为celery设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'voice_quality_assurance_configure.settings')
# 创建celery app
app = Celery('voice_quality_assurance_configure')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
# 从单独的配置模块中加载配置
app.config_from_object('voice_quality_assurance_configure.celeryconfig')
# 设置app自动加载任务
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# 解决时区问题,定时任务启动就循环输出
app.now = timezone.now
2.配置celeryconfig.py
from __future__ import absolute_import
from kombu import Queue
from django.conf import settings
# 设置代理人broker
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/2'
# 指定 Backend
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
# 指定时区,默认是 UTC
CELERY_TIMEZONE='Asia/Shanghai'
# celery 序列化与反序列化配置
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle', 'json']
CELERY_IGNORE_RESULT = True
# celery 的启动工作数量设置
CELERY_WORKER_CONCURRENCY = 10
# 任务预取功能,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
CELERYD_PREFETCH_MULTIPLIER = 20
# 有些情况下可以防止死锁
CELERYD_FORCE_EXECV = True
# celery 的 worker 执行多少个任务后进行重启操作
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
# 禁用所有速度限制,如果网络资源有限,不建议开足马力。
CELERY_DISABLE_RATE_LIMITS = True
# celery beat配置(周期性任务设置)
CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = settings.TIME_ZONE
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
3.分别启动woker和beta
项目根目录终端执行(voice_quality_assurance_configure为项目名称,简单来说,和manage.py文件同级)
celery -A voice_quality_assurance_configure beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler #
启动beta 调度器使用数据库
celery worker -A voice_quality_assurance_configure --loglevel=info -n worker1 #启动celery worker
4.创建周期性任务
from datetime import datetime, timedelta
import json
import os,django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "voice_quality_assurance_configure.settings")# project_name 项目名称
django.setup()
from django_celery_beat.models import PeriodicTask, IntervalSchedule
schedule, created = IntervalSchedule.objects.get_or_create(every=10,period=IntervalSchedule.SECONDS,)
# 带参数的创建方法,如下:
PeriodicTask.objects.create(
interval=schedule, # 上面创建10秒的间隔 interval 对象
name='test_task', # 设置任务的name值
task='mission.tasks.my_task', # 指定需要周期性执行的任务
args=json.dumps([10, 2, 76]),
expires=datetime.utcnow() + timedelta(seconds=30)
)
详解创建周期性任务的方法
创建基于interval的周期性任务
第一步创建间隔对象
schedule, created = IntervalSchedule.objects.get_or_create(
every=10,
period=IntervalSchedule.SECONDS,
)
IntervalSchedule.DAYS 固定间隔天数
IntervalSchedule.HOURS 固定间隔小时数
IntervalSchedule.MINUTES 固定间隔分钟数
IntervalSchedule.SECONDS 固定间隔秒数
IntervalSchedule.MICROSECONDS 固定间隔微秒
第二步创建任务
无参数的创建方法:
PeriodicTask.objects.create(
interval=schedule, # we created this above.
name='test_task', # simply describes this periodic task.
task='app名.tasks.任务函数名', # name of task.)
有参数的创建方法:
PeriodicTask.objects.create(
interval=schedule, # we created this above.
name='test'_task', # simply describes this periodic task.
task='app名.tasks.任务函数名', # name of task.
args=json.dumps(['arg1', 'arg2']),
kwargs=json.dumps({ 'be_careful': True, }),
expires=datetime.utcnow() + timedelta(seconds=30) )
class MonitorDeviceTask(object):
"""
设备创建,增加周期性任务
"""
def __init__(self, device_obj):
self.device_obj = device_obj
self.periodic_task = PeriodicTask.objects.create(
interval=schedule,
name='test_task',
task='mission.tasks.my_task',
args=json.dumps([self.device_obj.ip])
)
def starttask(self):
"""
启动任务
"""
self.periodic_task.enabled = True
self.periodic_task.save()
def stoptask(self):
"""
停止任务
"""
self.periodic_task.enabled = False
self.periodic_task.save()
def deltask(self):
"""
删除任务
"""
self.periodic_task.delete()
self.periodic_task.save()
创建基于 crontab 的周期性任务
from django_celery_beat.models import CrontabSchedule, PeriodicTask
schedule, _ = CrontabSchedule.objects.get_or_create(
minute='30',
hour='*',
day_of_week='*',
day_of_month='*',
month_of_year='*',
timezone=pytz.timezone('Canada/Pacific')
)
来源:https://www.cnblogs.com/-wenli/p/13746509.html


猜你喜欢
- pytorch retain_graph==True的作用说明总的来说进行一次backward之后,各个节点的值会清除,这样进行第二次bac
- 近日,被同事问及一个产品列表的做法怎么实现?一个产品列表,每个产品列表后面跟一个button,这些button居右对齐。其实这个效果跟新闻列
- 协同过滤在 用户 —— 物品(user - item)的数据关系下很容易收集到一些偏好信息(preference),比如评分。利用这些分散的
- config.php<?phpdefine('APP_DIR', dirname(__FILE__));define(
- Pytest使用的断言是使用python内置的断言assert。Python assert(断言)用于判断一个表达式,在表达式条件为 fal
- 1.什么是Store?上一篇文章说了,Vuex就是提供一个仓库,Store仓库里面放了很多对象。其中state就是数据源存放地,对应于与一般
- 1 引言这段时间在研究美团爬虫,用的是scrapy-redis分布式爬虫框架,奈何scrapy-redis与scrapy框架不同,默认只发送
- 由于xlwt目前只支持xls格式,至于xlsx格式,后面会继续更新import xlwtimport codecsdef Txt_to_Ex
- 1 导言 Microsoft 在Microsoft SQL Server 2000中推出了与XML相关的功能以及Transact-SQL 关
- 引言实现一个视频压缩的功能,性能优良 压缩视频 从61M 到 11M或者80M到15M视频看起来没有太大损伤缺点:耗时20s (win10,
- 其实golang用一个函数可以构建一个并发队列,现在编写一个灵活可控的队列程序先定义一个工作type Worker struct { &nb
- MySQL数据库中如何修改root用户的密码呢?下面总结了修改root用户密码的一些方法1: 使用set password语句修改mysql
- 开始制作符合标准的站点,第一件事情就是声明符合自己需要的DOCTYPE。查看本站首页原代码,可以看到第一行就是:<!DOCTYPE h
- 原来的程序是使用sqlite这个嵌入式数据库作为Remit(code name)的数据源的,因为NHibernate支持这个,然而有一点不好
- 1、df=DataFrame([{‘A':'11','B':'12'},{‘A
- 前言春联是中国传统文化中最具内涵的元素之一,它以对仗工整、简洁精巧的文字描绘美好形象,抒发美好愿望,是中国特有的文学形式,是华人们过年的重要
- 正文方法一:直接使用已知的cookie访问特点:简单,但需要先在浏览器登录原理:简单地说,cookie保存在发起请求的客户端中,服务器利用c
- 本文介绍了python OpenCV学习笔记实现二维直方图,分享给大家,具体如下:官方文档 – https://docs.opencv.or
- 前言SQL Server 是数据库软件中比较常见且实用的软件,它的集成度很高,且功能非常强大。很多类型的网站系统后台数据库都依赖于SQL S
- 前言:最近在做SOSO地图相关开发,遇到相关画圆知识,特此简单记录下来。1.在页面中添加SOSO地图API引用,引用脚本:<scrip