RabbitMq如何做到消息的可靠性投递
作者:Onemorelight95 发布时间:2024-04-23 09:36:57
如何保证消息不丢失
在使用RabbitMQ的时候,我们需要保证消息不能丢失,消息从生产者生产出来一直到消费者消费成功,这条链路是这样的:
消息的可靠投递分为了两大内容:发送端的确认(p->broker和exchange->queue)和消费端的确认(queue->c)。
发送端的确认
Rabbit提供了两种方式来保证发送端的消息可靠性投递:confirm 确认模式
和return 退回模式。
confirm 确认模式:消息从 producer 到达 exchange 则会给 producer 发送一个应答,我们需要开启confirm模式,才能接收到这条应答。开启方式是将Channel.Confirm(noWait bool)
参数设置为false
,表示同意发送者将当前channel信道设置为confirm模式。
return 退回模式:消息从 exchange–>queue 投递失败,会将消息退回给producer。
消费端的确认
消息从Queue发送到消费端之后,消费端会发送一个确认消息:Consumer Ack
,有两种确认方式:自动确认和手动确认。
在编码中,关于消息的确认方式,我们需要在消费者端调用Consumer
函数时,设置第三个参数:autoAck
是false还是true(false表示手动,true表示自动)。
自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。
但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用ch.Ack(false)
,手动签收,如果出现异常,则调用d.Reject(true)
让其自动重新发送消息。
Go 实现
安装操作库
安装API库
Go可以使用streadway/amqp
库来操作rabbit,使用以下命令来安装:
go get github.com/streadway/amqp
封装rabbitmq
接下来我们对streadway/amqp
库的内容进行一个二次封装,封装为一个rabbitmq.go
文件:
package rabbitmq
import (
"encoding/json"
"github.com/streadway/amqp"
"log"
)
// RabbitMQ RabbitMQ结构
type RabbitMQ struct {
channel *amqp.Channel
Name string
exchange string
}
// Connect 连接服务器
func Connect(s string) *RabbitMQ {
//连接rabbitmq
conn, e := amqp.Dial(s)
failOnError(e, "连接Rabbitmq服务器失败!")
ch, e := conn.Channel()
failOnError(e, "无法打开频道!")
mq := new(RabbitMQ)
mq.channel = ch
return mq
}
// New 初始化消息队列
//第一个参数:rabbitmq服务器的链接,第二个参数:队列名字
func New(s string, name string) *RabbitMQ {
//连接rabbitmq
conn, e := amqp.Dial(s)
failOnError(e, "连接Rabbitmq服务器失败!")
ch, e := conn.Channel()
failOnError(e, "无法打开频道!")
q, e := ch.QueueDeclare(
name, //队列名
false, //是否开启持久化
true, //不使用时删除
false, //排他
false, //不等待
nil, //参数
)
failOnError(e, "初始化消息队列失败!")
mq := new(RabbitMQ)
mq.channel = ch
mq.Name = q.Name
return mq
}
// QueueDeclare 声明queue
func (q *RabbitMQ) QueueDeclare(queue string) {
_, e := q.channel.QueueDeclare(queue, false, true, false, false, nil)
failOnError(e, "声明queue失败!")
}
// QueueDelete 删除queue
func (q *RabbitMQ) QueueDelete(queue string) {
_, e := q.channel.QueueDelete(queue, false, true, false)
failOnError(e, "删除queue失败!")
}
// Qos 配置queue参数
func (q *RabbitMQ) Qos() {
e := q.channel.Qos(1, 0, false)
failOnError(e, "无法设置QoS")
}
// NewExchange 初始化交换机
//第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型
func NewExchange(s string, name string, typename string) {
//连接rabbitmq
conn, e := amqp.Dial(s)
failOnError(e, "连接Rabbitmq服务器失败!")
ch, e := conn.Channel()
failOnError(e, "无法打开频道!")
e = ch.ExchangeDeclare(
name, // name
typename, // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(e, "初始化交换机失败!")
}
// ExchangeDelete 删除交换机
func (q *RabbitMQ) ExchangeDelete(exchange string) {
e := q.channel.ExchangeDelete(exchange, false, true)
failOnError(e, "删除交换机失败!")
}
// Bind 绑定消息队列到exchange
func (q *RabbitMQ) Bind(exchange string, key string) {
e := q.channel.QueueBind(
q.Name,
key,
exchange,
false,
nil,
)
failOnError(e, "绑定队列失败!")
q.exchange = exchange
}
// Send 向消息队列发送消息
//Send方法可以往某个消息队列发送消息
func (q *RabbitMQ) Send(queue string, body interface{}) {
str, e := json.Marshal(body)
failOnError(e, "消息序列化失败!")
e = q.channel.Publish(
"", //交换
queue, //路由键
false, //必填
false, //立即
amqp.Publishing{
ReplyTo: q.Name,
Body: []byte(str),
})
msg := "向队列:" + q.Name + "发送消息失败!"
failOnError(e, msg)
}
// Publish 向exchange发送消息
//Publish方法可以往某个exchange发送消息
func (q *RabbitMQ) Publish(exchange string, body interface{}, key string) {
str, e := json.Marshal(body)
failOnError(e, "消息序列化失败!")
e = q.channel.Publish(
exchange,
key,
false,
false,
amqp.Publishing{ReplyTo: q.Name,
Body: []byte(str)},
)
failOnError(e, "向交换机发送消息失败!")
}
// Consume 接收某个消息队列的消息
func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
c, e := q.channel.Consume(
q.Name, //指定从哪个队列中接收消息
"",
true,
false,
false,
false,
nil,
)
failOnError(e, "接收消息失败!")
return c
}
// Close 关闭队列连接
func (q *RabbitMQ) Close() {
q.channel.Close()
}
//错误处理函数
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
发送端的确认
首先初始化消息队列的时候,我们要开启confirm模式
,才能接收到这条应答。开启方式是将Channel.Confirm(noWait bool)
参数设置为false
,表示同意发送者将当前channel信道设置为confirm模式。
func New(s string, name string) *RabbitMQ {
conn, e := amqp.Dial(s)
failOnError(e, "连接Rabbitmq服务器失败!")
ch, e := conn.Channel()
failOnError(e, "无法打开频道!")
q, e := ch.QueueDeclare(
name, //队列名
false, //是否开启持久化
true, //不使用时删除
false, //排他
false, //不等待
nil, //参数
)
failOnError(e, "初始化消息队列失败!")
mq := new(RabbitMQ)
mq.channel = ch
mq.Name = q.Name
// 设置为confirm模式
mq.channel.Confirm(false)
return mq
}
然后在封装库中创建一个函数handleConfirm()
用于接收来自Borker的回复:
func (q *RabbitMQ) ConfirmFromBroker(ch chan amqp.Confirmation) chan amqp.Confirmation {
return q.channel.NotifyPublish(ch)
}
生产者
生产者端在向Broker发送消息的时候,我们使用一个无缓冲的通道来接收来自Broker的回复,然后创建一个协程监听这个无缓冲通道。
func main() {
producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue")
// 指定为topic类型
rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange1", "fanout")
confirm := producer.ConfirmFromBroker(make(chan amqp.Confirmation))
go handleConfirm(confirm)
var i int
for {
time.Sleep(time.Second)
producer.Publish("exchange1", "fanout message: "+strconv.Itoa(i), "")
i++
}
}
func handleConfirm(confirm <-chan amqp.Confirmation) {
for {
select {
case message := <-confirm:
fmt.Println("接收到来自Broker的回复:", message)
}
}
}
运行结果:
接收到来自Broker的回复: {1 true}
接收到来自Broker的回复: {2 true}
接收到来自Broker的回复: {3 true}
接收到来自Broker的回复: {4 true}
接收到来自Broker的回复: {5 true}
消费端的确认
首先将Consume
函数的第三个参数autoAck
参数标记为false:
// Consume 接收某个消息队列的消息
func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
c, e := q.channel.Consume(
q.Name,
"",
false, // 不自动确认消息
false,
false,
false,
nil,
)
failOnError(e, "接收消息失败!")
return c
}
在消费者端我们采用公平派遣模式,即队列发送消息给消费者的时候,不再采用轮询机制,而是一个消费者消费完消息之后,会调用Ack(false)
函数向队列发送一个回复,队列每次会将消息优先发送给消费完消息的消费者(回复过)。
消费端限流:
实现公平派遣模式我们需要设置消费者端一次只能消费一条消息,之前我们已经进行了封装,直接在消费者端调用即可:
// Qos 配置queue参数
func (q *RabbitMQ) Qos() {
e := q.channel.Qos(1, 0, false)
failOnError(e, "无法设置QoS")
}
生产者
func main() {
producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue")
// 指定为direct类型
rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange", "direct")
i := 0
for {
time.Sleep(time.Second)
producer.Publish("exchange", "routing message: "+strconv.Itoa(i), "key1")
i = i + 1
}
}
消费者1
消费者2在消费第三条消息的时候,假设发生了错误,我们调用d.Reject(true)
函数让队列重新发送消息。
func main() {
//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1")
// 指定一次只消费一条消息,直到消费完才重新接收
consumer1.Qos()
// 队列绑定到exchange
consumer1.Bind("exchange", "key1")
//接收消息
msgs := consumer1.Consume()
go func() {
var i int
for d := range msgs {
time.Sleep(time.Second * 1)
log.Printf("Consumer1 received a message: %s", d.Body)
// 假设消费第三条消息的时候出现了错误,我们就调用d.Reject(true),队列会重新发送消息给消费者
if i == 2 {
d.Reject(true)
} else {
// 消息消费成功之后就回复
d.Ack(false)
}
i++
}
}()
select {}
}
消费者2
func main() {
//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1")
// 指定一次只消费一条消息,直到消费完才重新接收
consumer2.Qos()
// 队列绑定到exchange
consumer2.Bind("exchange", "key1")
//接收消息
msgs := consumer2.Consume()
go func() {
for d := range msgs {
time.Sleep(time.Second * 5)
log.Printf("Consumer2 received a message: %s", d.Body)
// 消息消费成功之后就回复
d.Ack(false)
}
}()
select {}
}
运行结果:
# 消费者1
2022/11/06 19:55:08 Consumer1 received a message: "routing message: 0"
2022/11/06 19:55:10 Consumer1 received a message: "routing message: 2"
2022/11/06 19:55:11 Consumer1 received a message: "routing message: 3"
2022/11/06 19:55:12 Consumer1 received a message: "routing message: 3"
2022/11/06 19:55:13 Consumer1 received a message: "routing message: 4"
2022/11/06 19:55:14 Consumer1 received a message: "routing message: 6"# 消费者2
2022/11/06 19:55:13 Consumer2 received a message: "routing message: 1"
来源:https://blog.csdn.net/qq_49723651/article/details/127717187


猜你喜欢
- 该脚本的功能是卸载android手机中安装的所有第三方应用,主要是使用adb shell pm、adb uninstall 命令,所以使用的
- 人工神经网络的许多算法已在智能信息处理系统中获得广泛采用,尤为突出是是以下4种算法:ART网络、LVQ网络、Kohonen网络Hopfiel
- 本文实例讲述了Python wxPython库使用wx.ListBox创建列表框。分享给大家供大家参考,具体如下:如何创建一个列表框?列表框
- 优化可能带来的问题优化不总是对一个单纯的环境进行,还很可能是一个复杂的已投产的系统。优化手段本来就有很大的风险,只不过你没能力意识到和预见到
- 创建一个优秀的可视化图表的关键在于引导读者,让他们能理解图表所讲述的故事。在一些情况下,这个故事可以通过纯图像的方式表达,不需要额外添加文字
- 本文实例讲述了PHP实现的获取文件mimes类型工具类。分享给大家供大家参考,具体如下:<?php/* * Copyright 201
- import导入包搜索路径import用于导入包:import ( "fmt"
- 本文实例讲述了Mysql数据库中数据表的优化、外键与三范式用法。分享给大家供大家参考,具体如下:数据表优化将商品信息表进行优化1.创建商品种
- 爬取网站时经常会遇到需要登录的问题,这是就需要用到模拟登录的相关方法。python提供了强大的url库,想做到这个并不难。这里以登录学校教务
- 添加字段的语法:alter table tablename add (column datatype [default value][nul
- 内容摘要:网页设计师制作网页最常用的设计软件应该就算adobe的产品Photoshop了,当然Photoshop不仅可以设计网页,不过作为网
- 目录一、Python执行外部命令1、subprocess模块简介2、subprocess模块的遍历函数3、subprocess模块的Pope
- 用于绘制直线的line函数;用于绘制椭圆的ellipse函数;用于绘制矩形的rectangle函数;用于绘制圆的circle函数;用于绘制填
- 发现问题在使用pip install下载其他包时,报了错,如图:提示:“You are using pip versio
- 大家知道直接使用ASP是不能够重启服务器的,这时我们需要制作一个组件来实现功能,ASP通过这个组件调用系统API,然后按照不同的重启和关机方
- 同质化的网站越来越多,往往你还没发展起来,就已有许多站点抄袭走了你的成果,如何留下用户?——让用户有更好的使用体验。一些网页上的小技巧,可以
- 前一阵子工作项目上的事情忙的焦头烂额,最近要进行部门调整将要去做新的项目。又要学习很多新的知识了,还是很兴奋激动的。今天下班回来查看了一下V
- 编辑距离编辑距离(Edit Distance),又称Levenshtein距离,是指两个字串之间,由一个转成另一个所需的最少编辑操作次数。编
- os.makedir(path)和os.makedirs(path)今天工作中将hadoop文件同步到服务器磁盘,由于文件类别目录较多,迁移
- 本文介绍了Python3网络爬虫之使用User Agent和 * 隐藏身份,分享给大家,具体如下:运行平台:WindowsPython版本