python使用pika库调用rabbitmq交换机模式详解
作者:IT之一小佬 发布时间:2024-01-01 06:28:18
前言:
交换机模式主要包括:交换机之发布订阅、交换机之关键字和交换机之通配符。
1、交换机之发布订阅
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
生产者模式:
示例代码:
import pika
# 1.连接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.声明一个名为logs类型的fanout的交换机
channel.exchange_declare(exchange='logs', exchange_type='fanout') # 发布订阅模式参数
# 3.向logs交换机中插入数据:"Hello world"
message = 'info:Hello World!'
channel.basic_publish(exchange='logs',
routing_key='',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)
)
print(" [x] Sent 'Hello World!'")
运行结果:
消费者模式:
示例代码:
import pika
# 1.连接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.声明一个名为logs类型的fanout的交换机
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 3.创建队列
result = channel.queue_declare("", exclusive=True) # 随机生成一个队列名
queue_name = result.method.queue
print(queue_name)
# 4.将指定队列绑定到交换机上
channel.queue_bind(exchange='logs', queue=queue_name)
# 5.确定回调函数
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 6.确定监听队列参数
channel.basic_consume(queue=queue_name, # 指定队列
auto_ack=False, # 手动应答方式
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 7.正式监听
channel.start_consuming()
运行结果:【将程序重复执行三次,三个消费者都收到了同样的消息】
2、交换机之关键字
生产者模式:
示例代码: 【将info分别改为warning、error运行】
import pika
# 1.连接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.声明一个名为logs类型的direct的交换机
channel.exchange_declare(exchange='logs2', exchange_type='direct') # 发布订阅模式参数
# 3.向logs交换机中插入数据:"Hello world"
message = 'info:Hello World!'
channel.basic_publish(exchange='logs2',
routing_key='info', # info信息
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)
)
print(" [x] Sent 'Hello World!'")
运行结果:
消费者模式:
示例代码1:
import pika
# 1.连接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.声明一个名为logs类型的direct的交换机
channel.exchange_declare(exchange='logs2', exchange_type='direct')
# 3.创建队列
result = channel.queue_declare("", exclusive=True) # 随机生成一个队列名
queue_name = result.method.queue
print(queue_name)
# 4.将指定队列绑定到交换机上
channel.queue_bind(exchange='logs2', queue=queue_name, routing_key='info')
channel.queue_bind(exchange='logs2', queue=queue_name, routing_key='waring')
channel.queue_bind(exchange='logs2', queue=queue_name, routing_key='error')
# # 使用for循环将指定队列绑定到交换机上
# for key in ['info', 'waring', 'error']:
# channel.queue_bind(exchange='logs2', queue=queue_name, routing_key=key)
# 5.确定回调函数
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 6.确定监听队列参数
channel.basic_consume(queue=queue_name, # 指定队列
auto_ack=False, # 手动应答方式
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 7.正式监听
channel.start_consuming()
运行结果:
示例代码2:
import pika
# 1.连接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.声明一个名为logs类型的direct的交换机
channel.exchange_declare(exchange='logs2', exchange_type='direct')
# 3.创建队列
result = channel.queue_declare("", exclusive=True) # 随机生成一个队列名
queue_name = result.method.queue
print(queue_name)
# 4.将指定队列绑定到交换机上
channel.queue_bind(exchange='logs2', queue=queue_name, routing_key='info')
# # 使用for循环将指定队列绑定到交换机上
# for key in ['info', 'waring', 'error']:
# channel.queue_bind(exchange='logs2', queue=queue_name, routing_key=key)
# 5.确定回调函数
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 6.确定监听队列参数
channel.basic_consume(queue=queue_name, # 指定队列
auto_ack=False, # 手动应答方式
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 7.正式监听
channel.start_consuming()
运行结果:
示例代码3:
import pika
# 1.连接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.声明一个名为logs类型的direct的交换机
channel.exchange_declare(exchange='logs2', exchange_type='direct')
# 3.创建队列
result = channel.queue_declare("", exclusive=True) # 随机生成一个队列名
queue_name = result.method.queue
print(queue_name)
# 4.将指定队列绑定到交换机上
channel.queue_bind(exchange='logs2', queue=queue_name, routing_key='error')
# # 使用for循环将指定队列绑定到交换机上
# for key in ['info', 'waring', 'error']:
# channel.queue_bind(exchange='logs2', queue=queue_name, routing_key=key)
# 5.确定回调函数
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 6.确定监听队列参数
channel.basic_consume(queue=queue_name, # 指定队列
auto_ack=False, # 手动应答方式
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 7.正式监听
channel.start_consuming()
运行结果:
3、交换机之通配符
通配符交换机”与之前的路由模式相比,它将信息的传输类型的key更加细化,以“key1.key2.keyN....”的模式来指定信息传输的key的大类型和大类型下面的小类型,让消费者可以更加精细的确认自己想要获取的信息类型。而在消费者一段,不用精确的指定具体到哪一个大类型下的小类型的key,而是可以使用类似正则表达式(但与正则表达式规则完全不同)的通配符在指定一定范围或符合某一个字符串匹配规则的key,来获取想要的信息。
“通配符交换机”(Topic Exchange)将路由键和某模式进行匹配。此时队列需要绑定在一个模式上。符号“#”匹配一个或多个词,符号“*”仅匹配一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”只会匹配到“audit.irs”。(这里与一般的正则表达式的“*”和“#”刚好相反,这里我们需要注意一下。)
生产者模式:
示例代码: 【分别将routing_key改为usa.news、news.usa和usa.weather执行一遍】
import pika
# 1.连接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.声明一个名为logs类型的topic的交换机
channel.exchange_declare(exchange='logs3', exchange_type='topic') # 发布订阅模式参数
# 3.向logs交换机中插入数据:"Hello world"
message = 'usa.news---------'
channel.basic_publish(exchange='logs3',
routing_key='usa.news', # usa.news
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)
)
print(" [x] Sent 'Hello World!'")
运行结果:
消费者模式:
示例代码1:
import pika
# 1.连接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.声明一个名为logs类型的topic的交换机
channel.exchange_declare(exchange='logs3', exchange_type='topic')
# 3.创建队列
result = channel.queue_declare("", exclusive=True) # 随机生成一个队列名
queue_name = result.method.queue
print(queue_name)
# 4.将指定队列绑定到交换机上
channel.queue_bind(exchange='logs3', queue=queue_name, routing_key='news.#')
# # 使用for循环将指定队列绑定到交换机上
# for key in ['info.#', 'waring.#', 'error.#']:
# channel.queue_bind(exchange='logs2', queue=queue_name, routing_key=key)
# 5.确定回调函数
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 6.确定监听队列参数
channel.basic_consume(queue=queue_name, # 指定队列
auto_ack=False, # 手动应答方式
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 7.正式监听
channel.start_consuming()
运行结果:
示例代码2:
import pika
# 1.连接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.声明一个名为logs类型的topic的交换机
channel.exchange_declare(exchange='logs3', exchange_type='topic')
# 3.创建队列
result = channel.queue_declare("", exclusive=True) # 随机生成一个队列名
queue_name = result.method.queue
print(queue_name)
# 4.将指定队列绑定到交换机上
channel.queue_bind(exchange='logs3', queue=queue_name, routing_key='#.news')
# # 使用for循环将指定队列绑定到交换机上
# for key in ['info.#', 'waring.#', 'error.#']:
# channel.queue_bind(exchange='logs2', queue=queue_name, routing_key=key)
# 5.确定回调函数
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 6.确定监听队列参数
channel.basic_consume(queue=queue_name, # 指定队列
auto_ack=False, # 手动应答方式
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 7.正式监听
channel.start_consuming()
运行结果:
示例代码3:
import pika
# 1.连接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104'))
channel = connection.channel()
# 2.声明一个名为logs类型的topic的交换机
channel.exchange_declare(exchange='logs3', exchange_type='topic')
# 3.创建队列
result = channel.queue_declare("", exclusive=True) # 随机生成一个队列名
queue_name = result.method.queue
print(queue_name)
# 4.将指定队列绑定到交换机上
channel.queue_bind(exchange='logs3', queue=queue_name, routing_key='#.weather')
# # 使用for循环将指定队列绑定到交换机上
# for key in ['info.#', 'waring.#', 'error.#']:
# channel.queue_bind(exchange='logs2', queue=queue_name, routing_key=key)
# 5.确定回调函数
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 6.确定监听队列参数
channel.basic_consume(queue=queue_name, # 指定队列
auto_ack=False, # 手动应答方式
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 7.正式监听
channel.start_consuming()
运行结果:
来源:https://blog.csdn.net/weixin_44799217/article/details/126533663
猜你喜欢
- 1、将css与javascript全部用下边的方法分离到外部文件中去。<link rel="stylesheet"
- 本文介绍一款工具 go-callvis,它能够将 Go 代码的调用关系可视化出来,并提供了可交互式的 web 服务。go get -u gi
- 一、安装配置PHP1、下载Php的版本zip包之后,解压缩到指定目录。下载地址:http://www.php.net/downl
- 你是否知道JavaScript其实也是一个函数式编程语言呢?本指南将教你如何利用JavaScript的函数式特性。要求:你应当已经对Java
- HTML5,被传为Flash 的杀手,是一种用于web 应用程序开发、具有变革意义的网络技术。HTML 5提供了一些新的元素和属性,其中有些
- zy3287 问:<script src="js.js?id=999" type="text/javas
- “/xxxxx”应用程序中的服务器错误。 -------------------------------------------------
- 看起来现在经常用到这样的效果来提高用户体验,所以就没事写了一个输入框提示列表的效果使用宽屏的朋友麻烦帮忙测试下,列表的位置有没有错位。代码可
- 1、实现效果2、实现步骤模块导入import os,sys,timefrom PyQt5 import QtCore,QtWidgets,Q
- 前言在 Go 单元测试这个系列的第二部分 数据库的Mock测试 中我们介绍了用 go-sqlmock 给数据库的 CRUD 操作做Mock
- RSS(Really Simple Syndication)是一种描述和同步网站内容的格式,是使用最广泛的XML应用。RSS搭建了信息迅速传
- 介绍init 方法通常用在初始化一个类实例时候,但其实它不是实例化一个类的时候第一个被调用 的方法。当使用 Student(id, name
- 一、 在数据库排序查询优化上的差异。在讲解这个内容之前,为了读者能够清楚我讲的内容,我要先谈一个概念。命中率,它是指从内存中取得数据而不从磁
- 前言for...in 是Python程序员使用最多的语句,for 循环用于迭代容器对象中的元素,这些对象可以是列表、元组、字典、集合、文件,
- 学习自然语言的最好方法就是溶入相应的语言环境在交流中学习,学习一种编程语言的最好方法就是看例程。为了帮助大家建立wml应用的第一印象,所以请
- 前言简单介绍下python的几个自动求导工具,tangent、autograd、sympy;在各种机器学习、深度学习框架中都包含了自动微分,
- 删除文件os.remove( filename ) # filename: "要删
- 目录结构:只需在自己的python项目下随便创建一个文件夹(下图中为:daka),然后将下载的chromedriver.exe、ask_fo
- 单线程同步使用socket传输数据使用json序列化消息体struct将消息编码为二进制字节串,进行网络传输消息协议// 输入{  
- [PHP] ; PHP还是一个不断发展的工具,其功能还在不断地删减 ; 而php.ini的设置更改可以反映出相当的变化,