django+celery+RabbitMQ自定义多个消息队列的实现
作者:Hello_Mr_Zheng 发布时间:2021-01-21 16:52:10
标签:django,celery,RabbitMQ,消息队列
关于django celery的使用网上有很多文章,本文就不多做更多的说明。
本文使用版本
python==3.8.15
Django==3.2.4
celery==5.2.7
celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from kombu import Exchange, Queue
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'zkcelery.settings')
app = Celery('zkcelery')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
# 看了一篇文章说,如果使用redis做broker,exchange可以不配置;但如果使用rabbitMQ做broker,就必须要配置。
queue = (
Queue('default', exchange=Exchange('default', type='direct'), routing_key='default'),
Queue('q1', exchange=Exchange('e1', type='direct'), routing_key='r1'),
Queue('q2', exchange=Exchange('e2', type='direct'), routing_key='r2'),
Queue('q3', exchange=Exchange('e3', type='fanout'), routing_key='r3'),
)
# 一旦配置了route后,所有的任务名都必须要指定route,否则任务无法执行。
# 经过测试,route匹配是最长匹配规则。
route = {
'apps.zhiding.tasks.add': {'queue': 'q1', 'routing_key': 'r1'},
'apps.zhiding.tasks.multiply': {'queue': 'q2', 'routing_key': 'r2'},
# 其它的任务名称,匹配这条路由
# 如果以上队列的worker服务器坏了,这些任务会被全部放进这个队列里,该队列的worker将继续处理这些任务
# 下面这条队列一定要配置,否则其它任务无法处理。
'*': {'queue': 'default', 'routing_key': 'default'},
}
app.conf.update(CELERY_QUEUES=queue, CELERY_ROUTES=route)
tasks.py
from celery import shared_task
import time
@shared_task
def add(x, y):
time.sleep(2)
print('任务睡眠2秒后执行了')
return x + y
@shared_task
def multiply(x, y):
time.sleep(5)
print('任务睡眠5秒后执行了')
return x * y
@shared_task
def sub(x, y):
time.sleep(4)
print('任务睡眠4秒后执行了')
return x - y
笔者也看了很多博文,在settings.py配置文件中写入CELERY_QUEUES和CELERY_ROUTES,上面的配置对应下来就是如下代码块:
CELERY_QUEUES = (
Queue('default', exchange=Exchange('default', type='direct'), routing_key='default'),
Queue('sq1', exchange=Exchange('sq1', type='direct'), routing_key='sq1'),
Queue('sq2', exchange=Exchange('sq2', type='direct'), routing_key='sq2'),
Queue('sq3', exchange=Exchange('sq3', type='fanout'), routing_key='sq3'),
)
CELERY_ROUTES = {
'apps.zhiding.tasks.add': {'queue': 'sq1', 'routing_key': 'sq1'},
'apps.zhiding.tasks.multiply': {'queue': 'sq2', 'routing_key': 'sq2'},
'*': {'queue': 'default', 'routing_key': 'default'},
}
但是笔者在实际使用中发现后面这种方式配置始终未生效,不知道是不是笔者版本的不同,没有做更多的研究,如果你能找到问题的原因,欢迎评论交流。
启动worker
# 笔者使用的windows,启动时需要加上-P eventlet
celery -A zkcelery worker -l info -P eventlet
启动后队列中出现配置中的个队列
同时会在rabbitmq中创建(如果不存在)4个队列,交换机和相应的绑定关系(当然也可以直接通过rabbitmq管理端直接创建自己需要的队列、交换机和绑定,具体根据个人习惯或者视工作场景而定选择)
以队列q1示例:
暂时先关闭worker,便于观察消息队列中的消息。
向队列中发送几条消息,消息均进入到配置中指定的queue中
再次启动worker,队列中的消息立马被消费
如何做到消费指定的队列中的消息,只需要启动的时候加上参数Q
# -Q指定消费的队列
# -n 指定worker节点的名称,避免启动多个时的重名冲突
celery -A zkcelery worker -l info -Q q1 -n node1 -P eventlet
可以看到终端中queues只有q1了
q1中的消息被消费掉了,其他队列没有变化
也可以同时指定多个消费队列
celery -A zkcelery worker -l info -Q q2,default -n node2 -P eventlet
当然也可以在生产方指定推送的队列,举例如下:
来源:https://blog.csdn.net/Hello_Mr_Zheng/article/details/129145139


猜你喜欢
- 学了几天终于大概明白pytorch怎么用了这个是直接搬运的官方文档的代码之后会自己试着实现其他nlp的任务# Author: Robert
- 一、函数的定义及其应用 所谓函数,就是把具有独立功能的代码块组织成为一个小模块,在需要的时候调用函数的使用包含两个步骤1.定义函数–封装独立
- 一、验证计算机服务列表中是否有MySQL服务(1)右击桌面上的计算机图标出现菜单列表选项,选中管理(有多种方式可以打开计算机本地服务列表,这
- Python打包分发工具setuptools:曾经 Python 的分发工具是 distutils,但它无法定义包之间的依赖关系。setup
- 前言昨天才开始接触,鼓捣了一个下午,接下来会持续更新,如果哪里有错误的地方,望各位大佬指出,谢谢!数据描述两个文件,一个文件包含了网络图的节
- user-define-session-inc.php文件代码:<?php function mysession_open($save
- 在上一篇文章中,我介绍了MySQL对XML支持的部分功能,包括--xml命令行选项,以及MySQL 5.1.5中开始引入的新功能。今天我将介
- 本文实例分析了Python字符串和文件操作常用函数。分享给大家供大家参考。具体如下:# -*- coding: UTF-8 -*-'
- ghhs("nav01","li"); // 鼠标经过时变色 ghh
- <% class menusPublic Title, ID, Image, TitleColor, Target, Backgrou
- 安全等于运算符(<=>)这个操作符和=操作符执行相同的比较操作,不过<=>可以用来判断NULL值。在两个操作数均为N
- JavaScript正变得越来越流行,它已经成为前端开发的第一选择,并且利用基于JavaScript语言的NodeJS,我们也可以开发出高性
- 今天真的被编码问题一直困扰着,午休都没进行。也真的见识到了各种编码。例如:gbk,unicode、utf-8、ansi、gb2312等。如果
- 引言在Python中但凡提到的赋值运算符其实讲的就是等号=,在编程语言中的等号含义再也不是数学中的1+1=2的这种等号,真实含义是将=右侧的
- 简介:在视频相关测试场景下,例如:有时需要知道全部视频的汇总时长,显然一个个打开并且手工计算耗时耗力,我们可以通过编写脚本进行快速汇总。获取
- 功能介绍 (需要版本5.0.45)大数据操作ORM性能瓶颈在实体转换上面,并且不能使用常规的Sql去实现当列越多转换越慢,SqlSugar将
- 当前,数据可视化已经成为数据科学领域非常重要的一部分。不同网络系统中产生的数据,都需要经过适当的可视化处理,以便更好的呈现给用户读取和分析。
- torch.optim的灵活使用详解1. 基本用法:要构建一个优化器Optimizer,必须给它一个包含参数的迭代器来优化,然后,我们可以指
- 输入任意一个大写字母,生成金字塔图形def GoldTa(input): L = [chr(i) for i in range(
- 在整个安装的过程中也遇到了很多的坑,故此做个记录,争取下次不再犯!我的整个基本配置如下:电脑环境如下:win10(64位)+CPU:E5-2