聊聊通过celery_one避免Celery定时任务重复执行的问题
作者:吾星喵 发布时间:2021-05-20 13:19:12
在使用Celery统计每日访问数量的时候,发现一个任务会同时执行两次,发现同一时间内(1s内)竟然同时发送了两次任务,也就是同时产生了两个worker,造成统计两次,一直找不到原因。
参考:https://www.jb51.net/article/226849.htm
有人使用 Redis 实现了分布式锁,然后也有人使用了 Celery Once。
Celery Once 也是利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现了 QueueOnce 类,该类提供了任务去重的功能,所以在使用时,我们自己实现的方法需要将 QueueOnce 设置为 base
@task(base=QueueOnce, once={'graceful': True})
后面的 once 参数表示,在遇到重复方法时的处理方式,默认 graceful 为 False,那样 Celery 会抛出 AlreadyQueued 异常,手动设置为 True,则静默处理。
另外如果要手动设置任务的 key,可以指定 keys 参数
@celery.task(base=QueueOnce, once={'keys': ['a']})
def slow_add(a, b):
sleep(30)
return a + b
解决步骤
Celery One允许你将Celery任务排队,防止多次执行
安装
pip install -U celery_once
要求,需要Celery4.0,老版本可能运行,但不是官方支持的。
使用celery_once,tasks需要继承一个名为QueueOnce的抽象base tasks
Once安装完成后,需要配置一些关于ONCE的选项在Celery配置中
from celery import Celery
from celery_once import QueueOnce
from time import sleep
celery = Celery('tasks', broker='amqp://guest@localhost//')
# 一般之前的配置没有这个,需要添加上
celery.conf.ONCE = {
'backend': 'celery_once.backends.Redis',
'settings': {
'url': 'redis://localhost:6379/0',
'default_timeout': 60 * 60
}
}
# 在原本没有参数的里面加上base
@celery.task(base=QueueOnce)
def slow_task():
sleep(30)
return "Done!"
要确定配置,需要取决于使用哪个backend进行锁定,查看Backends
在后端,这将覆盖apply_async和delay。它不影响直接调用任务。
在运行任务时,celery_once检查是否没有锁定(针对Redis键)。否则,任务将正常运行。一旦任务完成(或由于异常而结束),锁将被清除。如果在任务完成之前尝试再次运行该任务,将会引发AlreadyQueued异常。
example.delay(10)
example.delay(10)
Traceback (most recent call last):
..
AlreadyQueued()
result = example.apply_async(args=(10))
result = example.apply_async(args=(10))
Traceback (most recent call last):
..
AlreadyQueued()
graceful:如果在任务的选项中设置了once={'graceful': True},或者在运行时设置了apply_async,则任务可以返回None,而不是引发AlreadyQueued异常。
from celery_once import AlreadyQueued
# Either catch the exception,
try:
example.delay(10)
except AlreadyQueued:
pass
# Or, handle it gracefully at run time.
result = example.apply(args=(10), once={'graceful': True})
# or by default.
@celery.task(base=QueueOnce, once={'graceful': True})
def slow_task():
sleep(30)
return "Done!"
其他功能请访问:https://pypi.org/project/celery_once/
来源:https://www.jianshu.com/p/285dc3d703f4
猜你喜欢
- javascript request.setAttribute()详解request.setAttribute()怎么用的?JS
- 本文实例讲述了Python Scrapy框架:通用爬虫之CrawlSpider用法。分享给大家供大家参考,具体如下:步骤01: 创建爬虫项目
- 如下所示:'''@author: Jacobpc'''import osimport sys
- 原文:Unobtrusive Ajax。今天才看见的一个Presentation,是Jesse Skinner在06年10月发表的。虽然题目
- 在CentOS7下,默认安装的就是python2.7,我现在来教大家如何安装python3:1、首先安装python3.6可能使用的依赖#
- python取余运算符是什么?python取余运算符是%,即表示取模,返回除法的余数。假设变量: a=10,b=20:那么b % a 输出结
- 删除Git缓存的用户名和密码昨天在上传代码的时候提示输入用户名密码,结果输错了3次就没有提示框了,就一直报错(身份验证失败),没办法提交代。
- 引言本集开始,将会深入Document接口。打开或创建一个文档都会产生一个Document对象,它代表文档本身,所以绝大部分文档的操作都会依
- NopCommerce为了实现松耦合的框架设计目的,使用了IOC框架:Autofac。据有人测试,Autofac是性能很好的IOC工具。1、
- Python中的变量作用域一般在函数体外定义的变量成为全局变量,在函数内部定义的变量称为局部变量。全局变量所有作用域都可用,局部变量只能在本
- JAVA程序想要访问数据库,需要进行如下准备:1.安装一个数据库(这里使用mysql免安装版)2.下载该数据库的驱动包(这里使用mysql官
- 0x 00 返回值简介回顾下,上一节简单介绍了函数及其各种参数,其中也有简单介绍 print 和 return 的区别,print 仅仅是打
- 决策树分类与上一篇博客k近邻分类的最大的区别就在于,k近邻是没有训练过程的,而决策树是通过对训练数据进行分析,从而构造决策树,通过决策树来对
- 前言使用axios可以统一做请求响应拦截,例如请求响应时我们拦截响应信息,判断状态码,从而弹出报错信息。请求超时的时候断开请求,还可以很方便
- PyQt5树形结构控件QTreeWidget简介QTreeWidget 类根据预设的模型提供树形显示控件。QTreeWidget 使用类似于
- Python线程与进程进程:进程是程序的一次执行,每个进程都有自己的地址空间、内存、数据栈以及其他记录其运行的辅助数据。线程:所有的线程运行
- 一、概念触发器是一种特殊类型的存储过程,不由用户直接调用。创建触发器时会对其进行定义,以便在对特定表或列作特定类型的数据修改时执行。触发器可
- 本文利用Python3启动简单的HTTP服务器,以实现在同一网络中共享本地文件。启动HTTP服务器打开终端,转入目标文件所在文件夹,键入以下
- 一、使用ddt和data装饰器的大致框架如下,每个test_开头的方法,代表一条测试用例from ddt import ddt,dataim
- 数据API数据集方法不会修改数据集,而是创建新数据集。可通过调用 map() 方法将转换应用于每个元素:dataset = dataset.