python基于celery实现异步任务周期任务定时任务
作者:yaominghui 发布时间:2021-06-14 05:20:26
这篇文章主要介绍了python基于celery实现异步任务周期任务定时任务,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
hello, 小伙伴们, 好久不更新了,这一次带来的是celery在python中的应用以及设置异步任务周期任务和定时任务的步骤,希望能给入坑的你带来些许帮助.
首先是对celery的介绍,Celery其实是一个专注于实时处理和调度任务的分布式任务队列,同时提供操作和维护分布式系统所需要的全部数据, 因此可以用它提供的接口快速实现并管理一个分布式的任务队列,它本身不是任务队列,它是封装了操作常见任务队列的各种操作, 可以使用它快速进行任务队列的使用与管理.在Python中的组成部分是 1.用户任务 app 2.管道 broker 用于存储任务 官方推荐的是 redis rabbitMQ / backend 用于存储任务执行结果的 3, 员工 worker 大致流程入下:
最左边的是用户, 用户发起1个请求给服务器, 要服务器执行10个任务,将这10个任务分给10个调度器,即开启10个线程进行任务处理,worker会一直监听调度器是否有任务, 一旦发现有新的任务, 就会立即执行新任务,一旦执行完就会返回给调度器, 即backend, backend会将请求发送给服务器, 服务器将结果返回给用户, 表现的结果就是,这10个任务同时完成,同时返回,,这就是Celery的整个工作流程, 其中的角色分别为,任务(app_work), 调度器(broker + backend), 将任务缓存的部分, 即将所有任务暂时存在的地方,相当于生产者, 消费者(worker 可以指定数量, 即在创建worker命令的时候可以指定数量), 在worker拿到任务后,人就控制不了了, 除非把worker杀死, 不然肯定会执行完.
也即 任务来了以后, 调度器(broker)去缓存任务, worker去执行任务, 完成后返回backend,接着返回,
还有就是关于定时任务和周期任务在linux上为什么不用自身所带着的去做,是因为linux周期定时任务是不可控的, 不好管理, 返回值保存也是个麻烦事, 而celery只要开启着调度器, 就可以随时把人物结果获取到,即使用celery控制起来是非常方便的.
接下来就是实例代码:
workers.py
from celery import Celery
import time
# 创建一个Celery实例, 就是用户的应用app 第一个参数是任务名称, 可以随意起 后面的就是配置的broker和backend
diaoduqi= Celery("mytask", broker="redis://127.0.0.1:6379", backend="redis:127.0.0.1:6379")
# 接下来是为应用创建任务 ab
@diaoduqi.task
def ab(a,b):
time.sleep(15)
return a+b
brokers.py
from worker import ab
# 将任务交给Celery的Worker执行
res = ab.delay(2,4)
#返回任务ID
print(res.id)
backends.py
from celery.result import AsyncResult
from worker import diaoduqi
# 异步获取任务返回值
async_task = AsyncResult(id="31ec65e8-3995-4ee1-b3a8-1528400afd5a",app=diaoduqi)
# 判断异步任务是否执行成功
if async_task.successful():
#获取异步任务的返回值
result = async_task.get()
print(result)
else:
print("任务还未执行完成")
为了方便,现在直接将三个文件代表的部分命名在文件名称中.首先是启动workers.py
启动方式是依据系统的不同来启动的, 对于linux下 celery worker -A workers -l INFO 也可以指定开启的worker数量 即在后面添加的参数是 -c 5 表示指定5个worker 理论上指定的worker是无上限的,
在windows下需要安装一个eventlet模块进行运行, 不然不会运行成功 pip install eventlet 可以开启线程 不指定数量是默认6个worker, 理论上worker的数量可以开启无限个,但是celery worker -A s1 -l INFO -P eventlet -c 5 使用eventlet 开启5个worker 执行
该命令后 处于就绪状态, 需要发布任务, 即brokers.py进行任务发布, 方法是使用delay的方式执行异步任务, 返回了一个任务id, 接着去backends.py中取这个任务id, 去查询任务是否完成,判定条件即任务.successful 判断是否执行完, 上面就是celery异步执行任务的用法与解释
接下来就是celery在项目中的应用
在实际项目中应用celery是有一定规则的, 即目录结构应该如下.
结构说明 首先是创建一个CeleryTask的包,接着是在里面创建一个celery.py,必须是这个文件 关于重名的问题, 找寻模块的顺序是先从当前目录中去寻找, 根本找不到,接着是从内置模块中去找, 根本就找不到写的这个celery这个文件,
celery.py
from celery import Celery
DDQ = Celery("DDQ",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379",
include=["CeleryTask.TaskOne","CeleryTask.TaskTwo"])
TaskOne.py
import time
from CeleryTask.celery import DDQ
@DDQ.task
def one1(a,b):
# time.sleep(3)
return a+b
@DDQ.task
def one2():
time.sleep(2)
return "one2"
taskTwo.py
import time
from CeleryTask.celery import DDQ
@DDQ.task
def two1():
time.sleep(2)
return "two1"
@DDQ.task
def two2():
time.sleep(3)
return "two2"
getR.py
from CeleryTask.TaskOne import one1 as one
# one.delay(10,10)
# two.delay(20,20)
# 定时任务我们不在使用delay这个方法了,delay是立即交给task 去执行
# 现在我们使用apply_async定时执行
# 首先我们要先给task一个执行任务的时间
import datetime, time
# 获取当前时间 此时间为东八区时间
ctime = time.time()
# 将当前的东八区时间改为 UTC时间 注意这里一定是UTC时间,没有其他说法
utc_time = datetime.datetime.utcfromtimestamp(ctime)
# 为当前时间增加 10 秒
add_time = datetime.timedelta(seconds=10)
action_time = utc_time + add_time
# action_time 就是当前时间未来10秒之后的时间
# 现在我们使用apply_async定时执行
res = one.apply_async(args=(6, 10), eta=action_time)
res = one.apply_async(args=(6, 10), eta=action_time)
res = one.apply_async(args=(6, 10), eta=action_time)
res = one.apply_async(args=(6, 10), eta=action_time)
res = one.apply_async(args=(6, 10), eta=action_time)
res = one.apply_async(args=(6, 10), eta=action_time)
print(res.id)
# 这样原本延迟5秒执行的One函数现在就要在10秒钟以后执行了
接着是在命令行cd到与CeleryTask同级目录下, 使用命令 celery worker -A CeleryTask -l INFO -P eventlet -c 50 这样 就开启了worker 接着去 发布任务, 在定时任务中不再使用delay这个方法了,
delay是立即交给ttask去执行, 在这里使用 apply_async定时执行 指的是调度的时候去定时执行
需要设置的是UTC时间, 以及定时的时间(多长时间以后执行) 之后使用 celery worker -A CeleryTask -l INFO -P eventlet -c 50 命令开启worker, 之后运行 getR.py文件发布任务, 可以看到在定义的时间以后执行该任务
周期任务
周期任务 指的是在指定时间去执行任务 需要导入的一个模块有 crontab
文件结构如下
结构同定时任务差不多,只不过需要变动一下文件内容 GetR文件已经不需要了,可以删除.
celery.py
from celery import Celery
from celery.schedules import crontab
DDQ = Celery("DDQ", broker="redis://127.0.0.1:6379", backend="redis://127.0.0.1:6379",
include=["CeleryTask.TaskOne", "CeleryTask.TaskTwo"])
# 我要要对beat任务生产做一个配置,这个配置的意思就是每10秒执行一次Celery_task.task_one任务参数是(10,10)
DDQ.conf.beat_schedule = {
"each10s_task": {
"task": "CeleryTask.TaskOne.one1",
"schedule": 10, # 每10秒钟执行一次
"args": (10, 10)
},
"each1m_task": {
"task": "CeleryTask.TaskOne.one2",
"schedule": crontab(minute=1) # 每1分钟执行一次 也可以替换成 60 即 "schedule": 60
}
}
TaskOne.py
import time
from CeleryTask.celery import DDQ
@DDQ.task
def one1(a,b):
# time.sleep(3)
return a+b
@DDQ.task
def one2():
time.sleep(2)
return "one2"
taskTwo.py
import time
from CeleryTask.celery import DDQ
@DDQ.task
def two1():
time.sleep(2)
return "two1"
@DDQ.task
def two2():
time.sleep(3)
return "two2"
以上配置完成以后,这时候就不能直接创建worker了,因为要执行周期任务,需要首先有一个任务的生产方, 即 celery beat -A CeleryTask, 用来产生创建者, 接着是创建worker worker的创建命令还是原来的命令, 即 celery worker -A CeleryTask -l INFO -P eventlet -c 50 , 创建完worker之后, 每10秒就会由beat创建一个任务给 worker去执行.至此, celery创建异步任务, 周期任务,定时任务完毕, 伙伴们自己拿去测试吧.
来源:https://www.cnblogs.com/f-g-f/p/11300656.html


猜你喜欢
- 前言在进行一个应用系统的开发过程中,从上到下一般需要四个构件:客户端-业务逻辑层-数据访问层-数据库,其中数据访问层是一个底层、核心的技术。
- 文件操作是开发中经常遇到的场景,那么如何判断一个对象是文件对象呢?下面我们总结了3种常见的方法。方法1:比较类型第一种方法,就是判断对象的t
- 主要有五个方面:一..HTML页面转UTF-8编码问题二.PHP页面转UTF-8编码问题三.MYSQL数据库使用UTF-8编码的问题四.JS
- 1.安装Pillowpip install Pillow2.安装tesseract-ocrgithub地址: https://gi
- 本文实例为大家分享了JavaScript实现动态生成表格的具体代码,供大家参考,具体内容如下功能描述在输入框中输入行和列,点击按钮,生成拥有
- 今天要实现的功能就是以下这个功能:vue.js模拟日历插件好了废话不多说了 直接上代码了css:*{ margi
- SQL Server中事务日志的作用:持续记录数据库所有的事务和这些事务对数据库所做的修改;一旦数据库出现灾难事件,就需要事务日志来进行近期
- 本来是在找交通识别的程序,然后凑巧看见了证件照换底,于是学习了一下~一开始在网上找了一个很普遍写的程序,但是效果并不好,想要放弃了,然后看见
- 一、前言在近半年的 Python 命令行旅程中,我们依次学习了 argparse 、 docopt 、 click 和 fire 库的特点和
- 导语之前有很多小伙伴说想学习一下多线程图片下载器,虽然好像已经过去很久了,不过还是上来安排一波吧。至于题目为什么说是构建一个小型数据集,因为
- phpMyAdmin错误 缺少 mysqli 扩展。请检查 PHP 配置 的解决方案phpMyAdmin 缺少 mysqli 扩展。请检查
- 在工作中遇到过 个问题执行一条代码时间过长 而且还不报错,卡死在那。还要继续执行下面代码,如何操作。下面是个简单的实例pip安装 第三方ev
- 废话不多说,估计只有我这个菜鸟废了2个小时才搞出来,主要是我想了太多方法来实现,最后都因为这因为那的原因失败了间接说明自己对可变与不可变类型
- Hpack 是啥Hpack 是 HTTP2 的头部压缩算法。在 HTTP1 中,每次传输都会有大量的 Header 携带,我们可以拿一个实际
- phpinfo() 功能描述:输出 PHP 环境信息以及相关的模块、WEB 环境等信息。 危险等级:中 passthru() 功能描述:允许
- 这篇文章主要介绍了基于python traceback实现异常的获取与处理,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参
- 开始这是去年的问题了,今天在整理邮件的时候才发现这个问题,感觉顶有意思的,特记录下来。在表RelationGraph中,有三个字段(ID,N
- 一: 删除LOG1:分离数据库 企业管理器->服务器->数据库->右键->分离数据库2:删除LOG文件3:附加数据库 企业管理器->服务器-
- 如果有人问你,GET和POST,有什么区别?你会如何回答?真实案例 前几天有人问我这个问题。
- python 自动化批量生成前端的HTML可以大大减轻工作量下面演示两种生成 HTML 的方法方法一:使用 webbrowser#codin