聊聊通过celery_one避免Celery定时任务重复执行的问题
作者:吾星喵 发布时间:2021-05-20 13:19:12
在使用Celery统计每日访问数量的时候,发现一个任务会同时执行两次,发现同一时间内(1s内)竟然同时发送了两次任务,也就是同时产生了两个worker,造成统计两次,一直找不到原因。
参考:https://www.jb51.net/article/226849.htm
有人使用 Redis 实现了分布式锁,然后也有人使用了 Celery Once。
Celery Once 也是利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现了 QueueOnce 类,该类提供了任务去重的功能,所以在使用时,我们自己实现的方法需要将 QueueOnce 设置为 base
@task(base=QueueOnce, once={'graceful': True})
后面的 once 参数表示,在遇到重复方法时的处理方式,默认 graceful 为 False,那样 Celery 会抛出 AlreadyQueued 异常,手动设置为 True,则静默处理。
另外如果要手动设置任务的 key,可以指定 keys 参数
@celery.task(base=QueueOnce, once={'keys': ['a']})
def slow_add(a, b):
sleep(30)
return a + b
解决步骤
Celery One允许你将Celery任务排队,防止多次执行
安装
pip install -U celery_once
要求,需要Celery4.0,老版本可能运行,但不是官方支持的。
使用celery_once,tasks需要继承一个名为QueueOnce的抽象base tasks
Once安装完成后,需要配置一些关于ONCE的选项在Celery配置中
from celery import Celery
from celery_once import QueueOnce
from time import sleep
celery = Celery('tasks', broker='amqp://guest@localhost//')
# 一般之前的配置没有这个,需要添加上
celery.conf.ONCE = {
'backend': 'celery_once.backends.Redis',
'settings': {
'url': 'redis://localhost:6379/0',
'default_timeout': 60 * 60
}
}
# 在原本没有参数的里面加上base
@celery.task(base=QueueOnce)
def slow_task():
sleep(30)
return "Done!"
要确定配置,需要取决于使用哪个backend进行锁定,查看Backends
在后端,这将覆盖apply_async和delay。它不影响直接调用任务。
在运行任务时,celery_once检查是否没有锁定(针对Redis键)。否则,任务将正常运行。一旦任务完成(或由于异常而结束),锁将被清除。如果在任务完成之前尝试再次运行该任务,将会引发AlreadyQueued异常。
example.delay(10)
example.delay(10)
Traceback (most recent call last):
..
AlreadyQueued()
result = example.apply_async(args=(10))
result = example.apply_async(args=(10))
Traceback (most recent call last):
..
AlreadyQueued()
graceful:如果在任务的选项中设置了once={'graceful': True},或者在运行时设置了apply_async,则任务可以返回None,而不是引发AlreadyQueued异常。
from celery_once import AlreadyQueued
# Either catch the exception,
try:
example.delay(10)
except AlreadyQueued:
pass
# Or, handle it gracefully at run time.
result = example.apply(args=(10), once={'graceful': True})
# or by default.
@celery.task(base=QueueOnce, once={'graceful': True})
def slow_task():
sleep(30)
return "Done!"
其他功能请访问:https://pypi.org/project/celery_once/
来源:https://www.jianshu.com/p/285dc3d703f4
猜你喜欢
- 1. torch- torchvision- python版本对应关系2. CUDA Toolkit 和PyTorch的对应关系3. 安装说
- 本文实例讲述了PHP实现的微信公众号扫码模拟登录功能。分享给大家供大家参考,具体如下:PHP微信公众号扫码模拟登录功能功能只是将:https
- 在HTML中,常见的URL有多种表示方式:相对URL: example.php demo
- 有使用过VS2005开发工具的朋友或者其他语句如js中都有Try catch 语句块,那么在mysql中是否能有SQLserver的@@er
- 有时你会发现你写的视图函数是十分类似的,只有一点点的不同。 比如说,你有两个视图,它们的内容是一致的,除了它们所用的模板不太一样:# url
- 现状≠将来?程序员做设计本身就很悲哀,纠结于客户与坚持之间就更是如此。无论我今后的路会怎么走,我想始终不变的事情就是与客户博弈了。无论是放弃
- 在建立与服务器的连接时出错。在连接到 SQL Server 2005 时,在默认的设置下 SQL Server 不允许进行远程连接可能会导致
- 任何使用yield的函数都称之为生成器,如:def count(n): while n > 0: &nb
- 基于很多原因,有不少朋友的空间不支持FSO,当大量的页面生成静态页面时,占用大量的空间,修改样式时,不方便,所以动态页面还是有动态页面的好处
- 一、前言卷起来好吧,元旦已经过了,就开始写文章模式了。这篇文章会对完全数的各种侦测进行详细解释。写作不易,支持一波~二、完全数是什么1、定义
- matplotlib是功能十分强大的绘制二维图形的Python模块,它用Python语言实现了MATLAB画图函数的易用性,同时又有非常强大
- 什么是 go-cachego-cache 是一个轻量级的基于内存的 K-V 储存组件,内部实现了一个线程安全的 map[strin
- 本文实例讲述了Python使用Selenium模块模拟浏览器抓取斗鱼直播间信息。分享给大家供大家参考,具体如下:import timefro
- 1 算术运算add(other)比如进行数学运算加上具体的一个数字data['open'].add(1)2018-02-27
- 导语日常开发中,定位程序异常,追溯事件发生场景都需要通过日志记录的方式。可以说一个好的开发日志设计可以让开发人员在后续项目维护的过程中节省时
- 本文实例讲述了Python时间和字符串转换操作。分享给大家供大家参考,具体如下:例子:#!/usr/bin/python# -*- codi
- 在添加新记录时,有时候我们需要将部分动态内容自动添加到指定的文本域,例如上传一个文件后,将文件名自动添加到一个独立的记录框内,以
- 1、字典中的键存在时,可以通过字典名+下标的方式访问字典中改键对应的值,若键不存在则会抛出异常。如果想直接向字典中添加元素可以直接用字典名+
- 在Microsoft SQL Server 2000中,用于数据存储的实用工具是数据库。数据库的物理表现是操作系统文件,即在物理上,一个数据
- JSON(JavaScript Object Notation) 是一种轻量级的数据交换格式,它使得人们很容易的进行阅读和编写。同时也方便了