django+celery+RabbitMQ自定义多个消息队列的实现
作者:Hello_Mr_Zheng 发布时间:2021-01-21 16:52:10
标签:django,celery,RabbitMQ,消息队列
关于django celery的使用网上有很多文章,本文就不多做更多的说明。
本文使用版本
python==3.8.15
Django==3.2.4
celery==5.2.7
celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from kombu import Exchange, Queue
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'zkcelery.settings')
app = Celery('zkcelery')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
# 看了一篇文章说,如果使用redis做broker,exchange可以不配置;但如果使用rabbitMQ做broker,就必须要配置。
queue = (
Queue('default', exchange=Exchange('default', type='direct'), routing_key='default'),
Queue('q1', exchange=Exchange('e1', type='direct'), routing_key='r1'),
Queue('q2', exchange=Exchange('e2', type='direct'), routing_key='r2'),
Queue('q3', exchange=Exchange('e3', type='fanout'), routing_key='r3'),
)
# 一旦配置了route后,所有的任务名都必须要指定route,否则任务无法执行。
# 经过测试,route匹配是最长匹配规则。
route = {
'apps.zhiding.tasks.add': {'queue': 'q1', 'routing_key': 'r1'},
'apps.zhiding.tasks.multiply': {'queue': 'q2', 'routing_key': 'r2'},
# 其它的任务名称,匹配这条路由
# 如果以上队列的worker服务器坏了,这些任务会被全部放进这个队列里,该队列的worker将继续处理这些任务
# 下面这条队列一定要配置,否则其它任务无法处理。
'*': {'queue': 'default', 'routing_key': 'default'},
}
app.conf.update(CELERY_QUEUES=queue, CELERY_ROUTES=route)
tasks.py
from celery import shared_task
import time
@shared_task
def add(x, y):
time.sleep(2)
print('任务睡眠2秒后执行了')
return x + y
@shared_task
def multiply(x, y):
time.sleep(5)
print('任务睡眠5秒后执行了')
return x * y
@shared_task
def sub(x, y):
time.sleep(4)
print('任务睡眠4秒后执行了')
return x - y
笔者也看了很多博文,在settings.py配置文件中写入CELERY_QUEUES和CELERY_ROUTES,上面的配置对应下来就是如下代码块:
CELERY_QUEUES = (
Queue('default', exchange=Exchange('default', type='direct'), routing_key='default'),
Queue('sq1', exchange=Exchange('sq1', type='direct'), routing_key='sq1'),
Queue('sq2', exchange=Exchange('sq2', type='direct'), routing_key='sq2'),
Queue('sq3', exchange=Exchange('sq3', type='fanout'), routing_key='sq3'),
)
CELERY_ROUTES = {
'apps.zhiding.tasks.add': {'queue': 'sq1', 'routing_key': 'sq1'},
'apps.zhiding.tasks.multiply': {'queue': 'sq2', 'routing_key': 'sq2'},
'*': {'queue': 'default', 'routing_key': 'default'},
}
但是笔者在实际使用中发现后面这种方式配置始终未生效,不知道是不是笔者版本的不同,没有做更多的研究,如果你能找到问题的原因,欢迎评论交流。
启动worker
# 笔者使用的windows,启动时需要加上-P eventlet
celery -A zkcelery worker -l info -P eventlet
启动后队列中出现配置中的个队列
同时会在rabbitmq中创建(如果不存在)4个队列,交换机和相应的绑定关系(当然也可以直接通过rabbitmq管理端直接创建自己需要的队列、交换机和绑定,具体根据个人习惯或者视工作场景而定选择)
以队列q1示例:
暂时先关闭worker,便于观察消息队列中的消息。
向队列中发送几条消息,消息均进入到配置中指定的queue中
再次启动worker,队列中的消息立马被消费
如何做到消费指定的队列中的消息,只需要启动的时候加上参数Q
# -Q指定消费的队列
# -n 指定worker节点的名称,避免启动多个时的重名冲突
celery -A zkcelery worker -l info -Q q1 -n node1 -P eventlet
可以看到终端中queues只有q1了
q1中的消息被消费掉了,其他队列没有变化
也可以同时指定多个消费队列
celery -A zkcelery worker -l info -Q q2,default -n node2 -P eventlet
当然也可以在生产方指定推送的队列,举例如下:
来源:https://blog.csdn.net/Hello_Mr_Zheng/article/details/129145139
0
投稿
猜你喜欢
- 加密解密字符串的asp函数,如用于ASP链接地址栏参数的加密,看代码就明白。比如:show.asp?id=DB26538FA54C70E1E
- 你喜欢在博客文章中使用图片吗?是的,如果不是很麻烦的话,相信大家都不会介意放上几张漂亮的图片来点缀一下内容的,不过你的图片可能会导致下面的两
- 不用切图,只要设置基本的 图片及其属性即可!用鼠标右键控制图片翻转!<style>*{ FONT-SIZE: 12px; }se
- 本文实例讲述了PHP实现的curl批量请求操作。分享给大家供大家参考,具体如下:<?php$ch = array();$res = a
- # set 不支持索引和切片,是一个无需的不重复得到容器# 类似于字典,但是只有key 没有value# 创建集合dic1={}set1={
- 最近在看《深度学习:基于Keras的Python实践(魏贞原)》这本书,书中8.3创建了一个Scikit-Learn的Pipeline,首先
- 代码如下:Class Vector Private vector_datas() Private&n
- show.php源代码: <? if ($action=="cp"){ echo"<div ali
- 想要追赶 Python 的热潮,应该如何学习呢?现在许多人在自学之外,都会选择去培训机构学习。选择培训机构的的标准是什么呢?什么样的pyth
- 这里我们在SQL Server 2005\SQL Server 2008版本上通过举例子,说明临时表和表变量两者的一些特征,让我们对临时表和
- 比如:我们导入了某个客户的资料,我们知道此客户的姓名是ZhangShan,我们想知道,在我们的业务数据库(eg:NorthWind)中,有哪
- 本文以实例演示5种验证码,并介绍生成验证码的函数。PHP生成验证码的原理:通过GD库,生成一张带验证码的图片,并将验证码保存在Session
- #!/usr/bin/python# -*- coding: utf-8 -*-from scapy.all import *from ti
- python之pkl文件pkl文件是python里面保存文件的一种格式,如果直接打开会显示一堆序列化的东西。cPickle在python3中
- 另:@会影响效率如:@mysql_connect() 可以导出错误,但会影响效率。mysql与mysqli的区别:
- 只添加了一些自己想到的常用的功能,欢迎大家补充添加自己的好的思路. 通用的正则和方法可以写在RegExpObj中,
- 前因后果公司有人阳了,今天在家上班,突然小姨子就问我有没有baidu文库会员,想下载点东西,我心想这还要会员?用Python不是分分钟的事情
- 我们来编写一个,引用时用:<!--#include Virtual="page.inc"-->语句即可:pa
- 一、Session 的概念cookie 是在浏览器端保存键值对数据,而 session 是在服务器端保存键值对数据 session 的使用依
- 1、字符串的索引与获取字符串的索引方式与列表的索引方式是一样的。只不过列表是每个元素的自身就有一个索引位置,而字符串是每个字符就有一个索引位