Python中celery的使用
作者:fivenian 发布时间:2022-10-22 14:03:17
Celery简介
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
消息中间件:Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等。
任务执行单元:Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储:Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等。
版本支持情况:
Celery version 4.0 runs on
Python ❨2.7, 3.4, 3.5❩
PyPy ❨5.4, 5.5❩
This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.
If you're running an older version of Python, you need to be running an older version of Celery:
Python 2.6: Celery series 3.1 or earlier.
Python 2.5: Celery series 3.0 or earlier.
Python 2.4 was Celery series 2.2 or earlier.
Celery is a project with minimal funding, so we don't support Microsoft Windows. Please don't open any issues related to that platform.
Celery多用来执行异步任务,将耗时的操作交由Celery去异步执行,比如发送邮件、短信、消息推送、音视频处理等。还可以执行定时任务,定时执行某件事情,比如Redis中的数据每天凌晨两点保存至mysql数据库,实现Redis的持久化。
celery的异步任务
celery的使用
1.安装celery
$ pip install -U celery
1)安装相关依赖
$ pip install "celery[redis,auth,msgpack]"
序列化程序
celery[auth]
用于使用auth
安全序列化程序。
celery[msgpack]
用于使用 msgpack 序列化程序。
celery[redis]
使用 Redis 作为消息传输或结果后端。
2.安装redis
这里我们使用redis作为celery的broker,作为任务队列的存储和结果的存储。
对于 Redis 支持,您必须安装其他依赖项。您可以使用celery[redis]
bundle一次性安装 Celery 和这些依赖项:
$ pip install -U "celery[redis]"
1)配置
配置很简单,只需配置你的 Redis 数据库的位置:
app.conf.broker_url = 'redis://localhost:6379/0'
其中 URL 的格式为:
redis://:password@hostname:port/db_number
方案后面的所有字段都是可选的,并且将默认为localhost
端口 6379,使用数据库 0。
3.使用ceelry
1)首先我们可以创建一个celery的文件夹,然后创建一个tasks.py文件
celery/tasks.py
from celery import Celery
# 第一个参数就是当前脚本的名称,backend 任务执行结果的存储地址broker 任务队列的存储地址
app = Celery('tasks', backend='redis://127.0.0.1', broker='redis://127.0.0.1')
@app.task
def add(x, y):
return x + y
celery/run_tasks.py
from tasks import add
result = add.delay(1, 2)
print('Is task ready: %s' % result.ready()) # False说明任务还没有执行完
run_result = result.get(timeout=1)
print('task result: %s' % run_result)
print('Is task ready: %s' % result.ready())
4.启动celery
$ cd celry
$ celery -A tasks worker --loglevel=info
使用flower监控celery任务的执行情况
pip install flower
启动flower,指定我们的应用,确保你的celery是启动的。
cd celery
celery -A tasks flower --broker=redis://@localhost:6379/0
运行结果:
celery [celery args] flower [flower args].
[I 210825 10:54:00 command:152] Visit me at http://localhost:5555
[I 210825 10:54:00 command:159] Broker: redis://127.0.0.1:6379//
[I 210825 10:54:00 command:160] Registered tasks:
我们就可以通过5555端口看到celery异步任务的运行情况了

Django中使用celery
官方地址:https://docs.celeryproject.org/en/latest/django/first-steps-with-django.html
1.创建celery文件
根据官方文档的说明,我们可以直接在Django项目同名的应用下创建celery.py文件
recruitment/recruitment/celery.py
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SEttINGS_MODULE', 'recruitment.base') # 这里我把配置文件放到了根目录下的settings/base.py 中
app = Celery('recruitment')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django apps.
app.autodiscover_tasks()
def debug_task(self):
print(f'Request: {self.request!r}')
然后我们需要在这个celery.py文件所在的目录的__init__
文件中添加:
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when/保证所有app下的任务都能导入进来
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
2.添加celery配置
settings/base.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYD_MAX_TASKS_PER_CHILD = 10
CELERYD_LOG_FILE = os.path.join(BASE_DIR, "logs", "celery_work.log")
CELERYBEAT_LOG_FILE = os.path.join(BASE_DIR, "logs", "celery_beat.log")
3.在别的应用下使用celery执行异步任务 [使用celery异步发送钉钉群消息通知]
1.首先我们需要在应用下创建一个tasks.py文件interview/tasks.py
from __future__ import absolute_import, unicode_literals
from celery import shared_task
from .dingtalk import send
@shared_task
def send_dingtalk_message(message):
send(message)
interview/dingtalk.py
from dingtalkchatbot.chatbot import DingtalkChatbot
from django.conf import settings
def send(message, at_mobiles=[]):
# 引用 settings里面配置的钉钉群消息通知的WebHook地址:
webhook = settings.DINGTALK_WEB_HOOK
# 初始化机器人小Y,
xiaoY = DingtalkChatbot(webhook)
# 方式二:勾选“加签”选项时使用(v1.5以上新功能)
# xiaoY = DingtalkChatbot(webhook, secret=secret)
# Text消息@所有人
xiaoY.send_text(msg=('消息通知: %s' % message), at_mobiles=at_mobiles)
interview.views.py
from interview.tasks import send_dingtalk_message
def notify_interview(modeladmin, request, queryset):
candidates = ''
interviewers = ''
for obj in queryset:
candidates = obj.userame + '' + candidates
interviewers = obj.first_interviewer_user + '' + interviewers
# 这里的消息发送到钉钉, 或者通过 Celery 异步发送到钉钉
send_dingtalk_message.delay('候选人 %s 进入面试环节, 亲爱的面试官请做好面试准备:%s。' % (candidates, interviewers))
4.启动celery服务
启动celery服务,到我们的项目根目录启动,然后执行
$ celery -A recruitment worker -l info
如果需要制定配置文件,如果在mac下可以执行:
$ DJANGO_SEttINGS_MODULE=settings.base celery --app=recruitment worker --loglevel=info
启动flower监控异步任务
$ celery -A recruitment flower --broker=redis://localhost:6379/0
celery定时任务
来源:https://www.cnblogs.com/fivenian/p/15184850.html


猜你喜欢
- window.opener,是通过window.open打开子窗体的父窗体的引用。 比如在父窗体parentForm里面,通过window.
- 将训练好的模型参数保存起来,以便以后进行验证或测试,这是我们经常要做的事情。tf里面提供模型保存的是tf.train.Saver()模块。模
- 最近用sysbench进行了较多的性能测试,也总结一下它的特点和用法和需要注意的事项。sysbench是一个多线程性能测试工具,可以进行CP
- QueryCache(下面简称QC)是根据SQL语句来cache的。一个SQL查询如果以select开头,那么MySQL服务器将尝试对其使
- 概括、从python1.6开始就可以处理unicode字符了。 一、几种常见的编码格式。 1.1、ascii,用1个字节表示。 1.2、UT
- 语法 SET IDENTITY_INSERT [ database.[ owner.] ] { table } { ON | OFF } 参
- 获得当前日期+时间(date + time)函数:now()mysql> select now();+----------------
- Pynimate介绍Pynimate是python第三方用于动态可视化的数据模块。安装pip install pynimate实验示例fro
- 一般情况下,导出超时可能都是以下三种情况:一、sql语句复杂,查询时间过长;二、处理查询后数据逻辑冗余;三、数据量过大导致响应超时。接下来分
- 🚀 文章简介 :本篇文章的实战部分中主要使用到了 MediaPipe 与 OpenCv 两个库,实现了隔空操作的效果,主要有**隔空操作鼠标
- Tensorflow数据读取有三种方式:Preloaded data: 预加载数据Feeding: Python产生数据,再把数据喂给后端。
- 最近尝试了mysql的最新版本5.5.581.最新发布的mysql 5.5.8没有适合solaris 9的版本;这个在安装时需要注意2.它发
- .data文件格式.data文件是用来存储数据的一种文件格式。之前通常数据是用逗号隔开或tab健隔开的格式,现在也可能是文本文件格式或二进制
- 前言VScode是一个相当优秀的IDE,具备开源、跨平台、模块化、插件丰富、启动时间快、颜值高、可高度定制等等优秀的特质,不愧是微软爸爸的私
- 防止一般的采集以及小偷读取,加在顶部。同理,可以改造成JS脚本。下面的方法是通过选择同一IP的访问频率来达到防止采集的目的,就是可能也把搜索
- 本文实例讲述了Django框架基础模板标签与filter使用方法。分享给大家供大家参考,具体如下:一、基本的模板语言1、变量{{ }}1.1
- Worksheet 对象的 rows 属性和 columns 属性得到的是一 Generator 对象,不能用中括号取索引。可先用列表推导式
- 本文实例讲述了Python使用matplotlib实现交换式图形显示功能。分享给大家供大家参考,具体如下:一 代码from random i
- 题目:有四个数字:1、2、3、4,能组成多少个互不相同且无重复数字的三位数?各是多少?程序分析:可填在百位、十位、个位的数字都是1、2、3、
- 今天写了一个获取当前公网ip并且自动断开宽带连接的文件,和大家分享下。这个文件的具体用途大家懂的,可以尽管拿去用,不过目前只适用于Windo