Django中使用Celery的方法示例
作者:栖迟于一丘 发布时间:2021-08-05 06:12:08
起步
在 《分布式任务队列Celery使用说明》 中介绍了在 Python 中使用 Celery 来实验异步任务和定时任务功能。本文介绍如何在 Django 中使用 Celery。
安装
pip install django-celery
这个命令使用的依赖是 Celery 3.x 的版本,所以会把我之前安装的 4.x 卸载,不过对功能上并没有什么影响。我们也完全可以仅用Celery在django中使用,但使用 django-celery 模块能更好的管理 celery。
使用
可以把有关 Celery 的配置放到 settings.py 里去,但我比较习惯单独一个文件来放,然后在 settings.py 引入进来:
# celery_config.py
import djcelery
import os
os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
djcelery.setup_loader()
BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
# UTC
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_IMPORTS = (
'app.tasks',
)
# 有些情况可以防止死锁
CELERY_FORCE_EXECV = True
# 设置并发的worker数量
CELERYD_CONCURRENCY = 4
# 任务发送完成是否需要确认,这一项对性能有一点影响
CELERY_ACKS_LATE = True
# 每个worker执行了多少任务就会销毁,防止内存泄露,默认是无限的
CELERYD_MAX_TASKS_PER_CHILD = 40
# 规定完成任务的时间
CELERYD_TASK_TIME_LIMIT = 15 * 60 # 在15分钟内完成任务,否则执行该任务的worker将被杀死,任务移交给父进程
# 设置默认的队列名称,如果一个消息不符合其他的队列就会放在默认队列里面,如果什么都不设置的话,数据都会发送到默认的队列中
CELERY_DEFAULT_QUEUE = "default"
# 设置详细的队列
CELERY_QUEUES = {
"default": { # 这是上面指定的默认队列
"exchange": "default",
"exchange_type": "direct",
"routing_key": "default"
},
"beat_queue": {
"exchange": "beat_queue",
"exchange_type": "direct",
"routing_key": "beat_queue"
}
}
配置文件中设置了 CELERY_IMPORTS 导入的任务,所以在django app中创建相应的任务文件:
# app/tasks.py
from celery.task import Task
import time
class TestTask(Task):
name = 'test-task' # 给任务设置个自定义名称
def run(self, *args, **kwargs):
print('start test task')
time.sleep(4)
print('args={}, kwargs={}'.format(args, kwargs))
print('end test task')
在 settings.py 添加:
INSTALLED_APPS = [
# ...
'djcelery',
]
# Celery
from learn_django.celery_config import *
触发任务或提交任务可以在view中来调用:
# views.py
from django.http import HttpResponse
from app.tasks import TestTask
def test_task(request):
# 执行异步任务
print('start do request')
t = TestTask()
t.delay()
print('end do request')
return HttpResponse('ok')
启动 woker 的命令是:
python manage.py celery worker -l info
再启动django,访问该view,可以看到任务在worker中被消费了。
定时任务
在celery的配置文件 celery_config.py 文件中添加:
CELERYBEAT_SCHEDULE = {
'task1-every-1-min': { # 自定义名称
'task': 'test-task', # 与任务中name名称一致
'schedule': datetime.timedelta(seconds=5),
'args': (2, 15),
'options': {
'queue': 'beat_queue', # 指定要使用的队列
}
},
}
通过 options 的 queque 来指定要使用的队列,这里需要单独的队列是因为,如果所有任务都使用同一队列,对于定时任务来说,任务提交后会位于队列尾部,任务的执行时间会靠后,所以对于定时任务来说,使用单独的队列。
启动 beat:
python manage.py celery beat -l info
监控工具 flower
如果celery中的任务执行失败了,有些场景是需要对这些任务进行监控, flower 是基于 Tornado 开发的web应用。安装用 pip install flower ;启动它可以是:
python manage.py celery flower
# python manage.py celery flower --basic_auth=admin:admin
用浏览器访问 http://localhost:5555 即可查看:
来源:http://www.hongweipeng.com/index.php/archives/1680/


猜你喜欢
- 在默认情况下,我们使用PyQt5创建出来的窗口和部件都是默认的样式,虽然谈不上很丑,但是也毫无美感可言。其实,在PyQt5中,我们可以有较高
- 前言说到面向对象,大家都不陌生。关系型数据库也是后端日常用来存储数据的,但数据库是关系型的,因此,ORM通过对象模型和数据库的关系模型之间建
- 译注:这是一篇在Stack overflow上很热的帖子。提问者自称已经掌握了有关Python OOP编程中的各种概念,但始终觉得元类(me
- 本文实例为大家分享了Python实现简单扫雷游戏的具体代码,供大家参考,具体内容如下#coding: utf-8__note__ = &qu
- 本文实例讲述了GO语言映射(Map)用法。分享给大家供大家参考。具体如下:映射是一种内置的数据结构,用来保存键值对的无序集合。(1)映射的创
- 简介最近在整理我们项目代码的时候,发现有很多活动的代码在结构和提供的功能上都非常相似。为了方便今后的开发,我花了一点时间编写了一个生成代码框
- '-----------------------------------------------------------
- 启动服务发生1067错误:1.删除datadir 下的用户数据库文件 和 日志文件 (ib_logfile0,ib_logfile1)。2.
- 最近简单的对oracle,mysql,sqlserver2005的数据分页查询作了研究,把各自的查询的语句贴出来供大家学习.....(一)、
- 实现一个树形表格的时候有多种方法:比如把 ztree 的树形直接拼接成表格,或者用强大的 jqgrid 实现,今天介绍一个比较轻量级的实现:
- 代码: import os while True: dynamic = input('输入计算表达式:') if dynam
- 1、例子:拟合一种函数Func,此处为一个指数函数。出处:SciPy v1.1.0 Reference Guide#Headerimport
- 在Goland中,如果 import 了包,但在代码中没有使用,会自动帮你移除这个包的 引用有可能是习惯问题,每次写代码都习惯 先impor
- --查询 SELECT tp.tp_id, tp.tpmc, tp.leveid, tp.tpdz, tp.jgm, tp.scsj, tp
- 阅读上一篇:垂直栅格与渐进式行距(上) 新问题来也匆匆,去也“冲冲”。距上次发布垂直栅格与渐进式行距(上)发布,已经不知不觉过去了
- 今天学习到python的读取文件部分。还是以一段代码为例:filename='programming.txt'with op
- python中有很多字符串连接方式,今天在写代码,顺便总结一下:最原始的字符串连接方式:str1 + str2python 新字符串连接语法
- 展示import tkinter if __name__ == '__main__': win =
- 概述在python中,以单下划线开头的(_a)的代表不能直接访问的类属性,需通过类提供的接口进行访问,不能用“from
- 博主最近需要做一个物流信息查询,就去网上搜索一个快递鸟的API接口,返回值是以JSON格式,只需要返回是转成数组就能轻松实现各种实例了。下图