python队列通信:rabbitMQ的使用(实例讲解)
作者:ywq935 发布时间:2021-01-11 05:04:55
(一)、前言
为什么引入消息队列?
1.程序解耦
2.提升性能
3.降低多业务逻辑复杂度
(二)、python操作rabbit mq
rabbitmq配置安装基本使用参见上节文章,不再复述。
若想使用python操作rabbitmq,需安装pika模块,直接pip安装:
pip install pika
1.最简单的rabbitmq producer端与consumer端对话:
producer:
#Author :ywq
import pika
auth=pika.PlainCredentials('ywq','qwe') #save auth indo
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.0.158',5672,'/',auth)) #connect to rabbit
channel = connection.channel() #create channel
channel.queue_declare(queue='hello') #declare queue
#n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!') #the body is the msg content
print(" [x] Sent 'Hello World!'")
connection.close()
consumer:
#Author :ywq
import pika
auth=pika.PlainCredentials('ywq','qwe') #auth info
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.0.158',5672,'/',auth)) #connect to rabbit
channel = connection.channel() #create channel
channel.queue_declare(queue='hello') #decalre queue
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
消息传递消费过程中,可以在rabbit web管理页面实时查看队列消息信息。
2.持久化的消息队列,避免宕机等意外情况造成消息队列丢失。
consumer端无需改变,在producer端代码内加上两个属性,分别使消息持久化、队列持久化,只选其一还是会出现消息丢失,必须同时开启:
delivery_mode=2 #make msg persisdent
durable=True
属性插入位置见如下代码(producer端):
#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
'192.168.0.158',5672,'/',auth_info
))
channel=connection.channel()
channel.queue_declare(queue='test1',durable=True) #durable=Ture, make queue persistent
msg=''.join(sys.argv[1:]) or 'Hello'
channel.basic_publish(
exchange='',
routing_key='test1',
body=msg,
properties=pika.BasicProperties(
delivery_mode=2 #make msg persisdent
)
)
print('Send done:',msg)
connection.close()
3.公平分发
在多consumer的情况下,默认rabbit是轮询发送消息的,但有的consumer消费速度快,有的消费速度慢,为了资源使用更平衡,引入ack确认机制。consumer消费完消息后会给rabbit发送ack,一旦未ack的消息数量超过指定允许的数量,则不再往该consumer发送,改为发送给其他consumer。
producer端代码不用改变,需要给consumer端代码插入两个属性:
channel.basic_qos(prefetch_count= *) #define the max non_ack_count
channel.basic_ack(delivery_tag=deliver.delivery_tag) #send ack to rabbitmq
属性插入位置见如下代码(consumer端):
#Author :ywq
import pika,time
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
'192.168.0.158',5672,'/',auth_info
)
)
channel=connection.channel()
channel.queue_declare(queue='test2',durable=True)
def callback(chann,deliver,properties,body):
print('Recv:',body)
time.sleep(5)
chann.basic_ack(delivery_tag=deliver.delivery_tag) #send ack to rabbit
channel.basic_qos(prefetch_count=1)
'''
注意,no_ack=False 注意,这里的no_ack类型仅仅是告诉rabbit该消费者队列是否返回ack,若要返回ack,需要在callback内定义
prefetch_count=1,未ack的msg数量超过1个,则此consumer不再接受msg,此配置需写在channel.basic_consume上方,否则会造成non_ack情况出现。
'''
channel.basic_consume(
callback,
queue='test2'
)
channel.start_consuming()
三、消息发布/订阅
上方的几种模式都是producer端发送一次,则consumer端接收一次,能不能实现一个producer发送,多个关联的consumer同时接收呢?of course,rabbit支持消息发布订阅,共支持三种模式,通过组件exchange转发器,实现3种模式:
fanout: 所有bind到此exchange的queue都可以接收消息,类似广播。
direct: 通过routingKey和exchange决定的哪个唯一的queue可以接收消息,推送给绑定了该queue的consumer,类似组播。
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息,类似前缀列表匹配路由。
1.fanout
publish端(producer):
#Author :ywq
import pika,sys,time
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
'192.168.0.158',5672,'/',auth_info
)
)
channel=connection.channel()
channel.exchange_declare(exchange='hello',
exchange_type='fanout'
)
msg=''.join(sys.argv[1:]) or 'Hello world %s' %time.time()
channel.basic_publish(
exchange='hello',
routing_key='',
body=msg,
properties=pika.BasicProperties(
delivery_mode=2
)
)
print('send done')
connection.close()
subscribe端(consumer):
#Author :ywq
import pika
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
'192.168.0.158',5672,'/',auth_info
)
)
channel=connection.channel()
channel.exchange_declare(
exchange='hello',
exchange_type='fanout'
)
random_num=channel.queue_declare(exclusive=True) #随机与rabbit建立一个queue,comsumer断开后,该queue立即删除释放
queue_name=random_num.method.queue
channel.basic_qos(prefetch_count=1)
channel.queue_bind(
queue=queue_name,
exchange='hello'
)
def callback(chann,deliver,properties,body):
print('Recv:',body)
chann.basic_ack(delivery_tag=deliver.delivery_tag) #send ack to rabbit
channel.basic_consume(
callback,
queue=queue_name,
)
channel.start_consuming()
实现producer一次发送,多个关联consumer接收。
使用exchange模式时:
1.producer端不再申明queue,直接申明exchange
2.consumer端仍需绑定队列并指定exchange来接收message
3.consumer最好创建随机queue,使用完后立即释放。
随机队列名在web下可以检测到:
2.direct
使用exchange同时consumer有选择性的接收消息。队列绑定关键字,producer将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列,consumer相应接收。即在fanout基础上增加了routing key.
producer:
#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
'192.168.0.158',5672,'/',auth_info
)
)
channel=connection.channel()
channel.exchange_declare(exchange='direct_log',
exchange_type='direct',
)
while True:
route_key=input('Input routing key:')
msg=''.join(sys.argv[1:]) or 'Hello'
channel.basic_publish(
exchange='direct_log',
routing_key=route_key,
body=msg,
properties=pika.BasicProperties(
delivery_mode=2
)
)
connection.close()
consumer:
#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
'192.168.0.158',5672,'/',auth_info
))
channel=connection.channel()
channel.exchange_declare(
exchange='direct_log',
exchange_type='direct'
)
queue_num=channel.queue_declare(exclusive=True)
queue_name=queue_num.method.queue
route_key=input('Input routing key:')
channel.queue_bind(
queue=queue_name,
exchange='direct_log',
routing_key=route_key
)
def callback(chann,deliver,property,body):
print('Recv:[level:%s],[msg:%s]' %(route_key,body))
chann.basic_ack(delivery_tag=deliver.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
callback,
queue=queue_name
)
channel.start_consuming()
同时开启多个consumer,其中两个接收notice,两个接收warning,运行效果如下:
3.topic
相较于direct,topic能实现模糊匹配式工作方式(在consumer端指定匹配方式),只要routing key包含指定的关键字,则将该msg发往绑定的queue上。
rabbitmq通配符规则:
符号“#”匹配一个或多个词,符号“”匹配一个词。因此“abc.#”能够匹配到“abc.m.n”,但是“abc.*‘' 只会匹配到“abc.m”。‘.'号为分割符。使用通配符匹配时必须使用‘.'号分割。
producer:
#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
'192.168.0.158',5672,'/',auth_info
)
)
channel=connection.channel()
channel.exchange_declare(exchange='topic_log',
exchange_type='topic',
)
while True:
route_key=input('Input routing key:')
msg=''.join(sys.argv[1:]) or 'Hello'
channel.basic_publish(
exchange='topic_log',
routing_key=route_key,
body=msg,
properties=pika.BasicProperties(
delivery_mode=2
)
)
connection.close()
consumer:
#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
'192.168.0.158',5672,'/',auth_info
))
channel=connection.channel()
channel.exchange_declare(
exchange='topic_log',
exchange_type='topic'
)
queue_num=channel.queue_declare(exclusive=True)
queue_name=queue_num.method.queue
route_key=input('Input routing key:')
channel.queue_bind(
queue=queue_name,
exchange='topic_log',
routing_key=route_key
)
def callback(chann,deliver,property,body):
print('Recv:[type:%s],[msg:%s]' %(route_key,body))
chann.basic_ack(delivery_tag=deliver.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
callback,
queue=queue_name
)
channel.start_consuming()
运行效果:
rabbitmq三种publish/subscribe模型简单介绍完毕。
来源:http://blog.csdn.net/ywq935/article/details/78633428
猜你喜欢
- 概述考虑这样一个问题,有hello.py脚本,输出”hello, world!”;有TestInput.py脚本,等待用户输入,然后打印用户
- 目录1.事件循环2.协程和异步编程2.1 基本使用2.2 await2.3 Task对象1.事件循环可以理解成为一个死循环,去检查任务列表中
- 50个常用sql语句 Student(S#,Sname,Sage,Ssex) 学生表 Course(C#,Cname,T#) 课程表 SC(
- 本文使用pygame实现播放mp3,文中用到pygame及mutagen库,安装:pip install pygamepip install
- ewebeditor支持兼容IE8 的方法方法:前几天ie8正式公布了,当天中午我就去下载了一个迫不急待的将自己的浏览器升级到ie8,偶还刻
- 以前没怎么仔细的研究过ajax,只是用到了就直接拿过来用,发现了问题再找解决方法.以下是我在找解决问题的过程中的一点小小的总结. 一.谈Aj
- 零、本讲学习目标了解面向对象编程思想掌握类和对象的定义和使用了解Python中的对象一、面向对象(一)程序员“面向对象”在现实世界中存在各种
- 列表列表是Python中最具灵活性的有序集合对象类型。与字符串不同的是,列表可以包含任何类型的对象:数字、字符串甚至其他列表。列表是可变对象
- 有时候我们需要在程序里执行一些cmd命令,使用os或者其它模块中的popen方法去执行这个问题一般是程序内有输入导致的,这个输入可以是inp
- 背景使用python操作一批同样分辨率的图片,合并为tiff格式的文件。由于opencv主要用于读取单帧的tiff文件,对多帧的文件支持并不
- 置信椭圆原理及椭圆图形绘制置信椭圆长短轴计算def confidence_oval(self,factor, ppf_rate): &nbs
- 目录当前时间实例1:实例2:指定时间戳实例1:实例2:总结我们将会启用到time库:当前时间实例1:import time# 获得当前时间时
- 之前一直傻傻分不清(n,1)和(n,)之间的区别,这里做一下总结,希望度过此文章的小伙伴都能明白。例如如果我们创建一个包含10个整型数的数组
- group by函数应该的使用应该是SELECT 列表中指定的每一列也必须出现在 GROUP BY 子句中,除非这列是用于聚合函数,但是今天
- 前言今天在学习python的过程中,发现python没有switch这个语法。于是就想在python中如何才能实现这个功能呢?正文本文中我们
- Flask提供的模板引擎为Jinja2,易于使用,功能强大。模板仅仅是文本文件,它可以生成任何基于文本的格式(HTML、XML、CSV、La
- 前言今天给大家带来的是Vue 3 中的极致防抖/节流(含常见方式防抖/节流)这篇文章,文章中不仅会讲述原来使用的防抖或节流方式,还会带来新的
- 新手,虽然比较简单的东西,但是弄了我很久。很多不完善的地方,比如锁定用户,同一用户输入错三次密码就会锁定,但是如果在第二第三次换了用户再输入
- 在日常的python编程中使用这几个函数来简化我们的编程工作,经常使用能使编程效率大大地提高。1. Map 函数map函数可以使用另外一个函
- 本文实例讲述了JS中的算法与数据结构之集合(Set)。分享给大家供大家参考,具体如下:集合(Set)同数学中所学的一样,集合(Set)是由一