利用Python操作消息队列RabbitMQ的方法教程
作者:Shawn 发布时间:2022-12-11 21:52:16
前言
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
应用场景:
RabbitMQ无疑是目前最流行的消息队列之一,对各种语言环境的支持也很丰富,作为一个.NET developer有必要学习和了解这一工具。消息队列的使用场景大概有3种:
1、系统集成,分布式系统的设计。各种子系统通过消息来对接,这种解决方案也逐步发展成一种架构风格,即“通过消息传递的架构”。
2、当系统中的同步处理方式严重影响了吞吐量,比如日志记录。假如需要记录系统中所有的用户行为日志,如果通过同步的方式记录日志势必会影响系统的响应速度,当我们将日志消息发送到消息队列,记录日志的子系统就会通过异步的方式去消费日志消息。
3、系统的高可用性,比如电商的秒杀场景。当某一时刻应用服务器或数据库服务器收到大量请求,将会出现系统宕机。如果能够将请求转发到消息队列,再由服务器去消费这些消息将会使得请求变得平稳,提高系统的可用性。
一、安装环境
首先是在 Linux 上安装 rabbitmq
# 环境为CentOS 7
yum install rabbitmq-server # 安装RabbitMQ
systemctl start rabbitmq-server # 启动
systemctl enable rabbitmq-server # 开机自启
systemctl stop firewall-cmd # 临时关闭防火墙
然后用 pip 安装 Python3 的开发包
pip3 install pika
安装好软件之后可以访问http://115.xx.xx.xx:15672/来访问自带的 web 页面来查看和管理 RabbitMQ。默认管理员的用户密码都是guest
二、简单的向队列中加入消息
#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 19:25
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Producer
import pika
# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx'))
# 创建频道对象
channel = connection.channel()
# 指定一个队列,如果该队列不存在则创建
channel.queue_declare(queue='test_queue')
# 提交消息
for i in range(10):
channel.basic_publish(exchange='', routing_key='test_queue', body='hello,world' + str(i))
print("sent...")
# 关闭连接
connection.close()
三、简单的从队列中获取消息
#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 19:40
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Consumer
import pika
credentials = pika.PlainCredentials('guest', 'guest')
# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials))
channel = connection.channel()
# 指定一个队列,如果该队列不存在则创建
channel.queue_declare(queue='test_queue')
# 定义一个回调函数
def callback(ch, method, properties, body):
print(body.decode('utf-8'))
# 告诉RabbitMQ使用callback来接收信息
channel.basic_consume(callback, queue='test_queue', no_ack=False)
print('waiting...')
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出。
channel.start_consuming()
四、万一消费者掉线了
想象这样一种情况:
消费者从消息队列中获取了 n 条数据,正要处理呢结果宕机了,那该怎么办?在 RabbieMQ 中有一个 ACK 可以用来确认消费者处理结束。就有点类似网络中的 ACK,消费者每次从队列中获取了数据之后队列不会立刻将数据移除,而是等待对应的 ACK。消费者获取到数据并处理完成之后会向队列发送一个 ACK 包,通知 RabbitMQ 这堆消息已经处理妥当了,可以删除了,这时候 RabbitMQ 才会将数据从队列中移除。所以这种情况下即使消费者掉线也没有什么问题,数据依旧会在队列中存在,留给其他消费者处理。
在 Python 中这样实现:
消费者有这样一行代码channel.basic_consume(callback, queue='test_queue', no_ack=False)
,其中no_ack=False
表示不发送确认包。将其修改为no_ack=True就会在每次处理完之后向 RabbitMQ 发送一个确认包,以确认消息处理完毕。
五、万一 RabbitMQ 宕机了呢
虽然有了 ACK 包,但是万一 RabbitMQ 挂了那数据还是会损失。所以我们可以给 RabbitMQ 设置一个数据持久化存储。RabbitMQ 会将数据持久化存储到磁盘上,保证下次再启动的时候队列还在。
在 Python 中这样实现:
我们声明一个队列是这样的channel.queue_declare(queue='test_queue')
,如果需要持久化一个队列可以这样声明channel.queue_declare(queue='test_queue', durable=True)
。不过这行直接放在代码中是不能执行的,因为以前已经有了一个名为test_queue的队列,RabbitMQ 不允许用不同的方式声明同一个队列,所以可以换一个队列名新建来指定数据持久化存储。不过如果只是这样声明的话,在 RabbitMQ 宕机重启后确实队列还在,不过队列里的数据就没有了。除非我们这样来声明队列channel.basic_publish(exchange='', routing_key="test_queue", body=message, properties=pika.BasicProperties(delivery_mode = 2,))
。
六、最简单的发布订阅
最简单的发布订阅在 RabbitMQ 中称之为Fanout模式。也就是说订阅者订阅某个频道,然后发布者向这个频道中发布消息,所有订阅者就都能接收到这条消息。不过因为发布者需要使用订阅者创建的随机队列所以需要先启动订阅者才能启动发布者。
发布者代码:
#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 20:21
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Publisher
import pika
# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx'))
# 创建频道对象
channel = connection.channel()
# 定义交换机,exchange表示交换机名称,type表示类型
channel.exchange_declare(exchange='my_fanout',
type='fanout')
message = 'Hello Python'
# 将消息发送到交换机
channel.basic_publish(exchange='my_fanout', # 指定exchange
routing_key='', # fanout下不需要配置,配置了也不会生效
body=message)
connection.close()
订阅者代码:
#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 20:20
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Subscriber
import pika
credentials = pika.PlainCredentials('guest', 'guest')
# 连接到RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials))
channel = connection.channel()
# 定义交换机,进行exchange声明,exchange表示交换机名称,type表示类型
channel.exchange_declare(exchange='my_fanout',
type='fanout')
# 随机创建队列
result = channel.queue_declare(exclusive=True) # exclusive=True表示建立临时队列,当consumer关闭后,该队列就会被删除
queue_name = result.method.queue
# 将队列与exchange进行绑定
channel.queue_bind(exchange='my_fanout',
queue=queue_name)
# 定义回调方法
def callback(ch, method, properties, body):
print(body.decode('utf-8'))
# 从队列获取信息
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
来源:https://blog.just666.cn/index.php/archives/63/


猜你喜欢
- 去空格函数有如下两种:·LTRIM()LTRIM() 函数把字符串头部(左)的空格去掉,其语法如下:LTRIM (<character
- 本文实例讲述了Python使用dict.fromkeys()快速生成一个字典。分享给大家供大家参考,具体如下:>>> re
- 在没学习开窗函数之前,我们都知道,用了分组之后,查询字段就只能是分组字段和聚合的字段,这带来了极大的不方便,有时我们查询时需要分
- 详解 Python中LEGB和闭包及装饰器LEGB L>E>G?BL:local函数内部作用域E:enclosing函数内部与内
- 一、 collections 中 defaultdict 的使用1.字典的键映射多个值将下面的列表转成字典l = [('a'
- 本文记录了python 3.7.0 安装配置方法,供大家参考,具体内容如下S1 登入Python官网下载网址S2 下载后缀为exe的可执行文
- iniconf博主前两天在写一个小的go语言项目,想找一个读取ini格式配置和的三方库,在网上找了一圈感觉都不是很好用, 使用起来非常的奇怪
- 今天接到测试人员反应,测试环境前端应用程序无连接mysql数据库,登录mysql服务器,查看错误日志,发现有如下报错:ERROR 1135
- 一:自动化了解知识工具安装什么样的项目适合做自动化?自动化测试一般在什么阶段开始实施?你们公司自动化的脚本谁来维护?如何维护?自动化用例覆盖
- 背景:ALTER作为DDL语言之一,工作中经常遇到,这里我们简单介绍一下常见的几种使用场景新建两个测试表offices 和 employes
- 1.表结构 2.表数据 3.查询teacher_name字段不能等于空并且也不能等于空字符SELECT * FROM s
- 为何选Nuxt.js?在前后端分离出现之前,传统的web页面都是服务端渲染的,如JSP、PHP、Python Django,还有各种模板技术
- 科讯5.0 标签和之前版本变化不大,如果用老版本的科讯,可以参考这个标签使用。相关文章:新云4.0 模板通用标签说明 标签清单:======
- 网络上的两个程序通过一个双向的通信连接实现数据的交换,这个连接的一端称为一个socket,一般在配置部署mysql环境时都会在mysql的m
- python版本为python3.51.要求1)输入用户名密码2)认证成功后显示欢迎信息3)输错三次后锁定2.需求分析1)用户信息存储在文件
- 我就废话不多说了,还是直接看代码吧!import matha=1;//边1b=1;//边2c=math.sqrt(2);//边3A=math
- 本文将讲述vue-cli+vux-scroller实现移动端的上拉加载功能:纠错声明:网上查阅资料看到很多人都将vux和vuex弄混,在这里
- PHP registerXPathNamespace() 函数实例为下一个 XPath 查询创建命名空间上下文:<?php $xml=
- 本文实例讲述了python追加元素到列表的方法。分享给大家供大家参考。具体实现方法如下:scores = ["1",&q
- 本文主要是利用Python的第三方库Pillow,实现单通道灰度图像的颜色翻转功能。# -*- encoding:utf-8 -*-impo