C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)
作者:做自己518 发布时间:2023-12-06 10:45:34
1:RabbitMQ是个啥?(专业术语参考自网络)
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。
RabbitMQ服务器是用Erlang语言编写的,Erlang是专门为高并发而生的语言,而集群和故障转移是构建在开发电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库
2:使用RabbitMQ有啥好处?
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RabbitMQ的可靠性是非常好的,数据能够保证百分之百的不丢失。可以使用镜像队列,它的稳定性非常好。所以说在我们互联网的金融行业。
对数据的稳定性和可靠性要求都非常高的情况下,我们都会选择RabbitMQ。当然没有kafka性能好,但是要比AvtiveMQ性能要好很多。也可以自己做一些性能的优化。
RabbitMQ可以构建异地双活架构,包括每一个节点存储方式可以采用磁盘或者内存的方式,
3:RabbitMq的安装以及环境搭建等:
网络上有很多关于怎么搭建配置RabbitMq服务环境的详细文章,也比较简单,这里不再说明,本人是Docker上面的pull RabbitMq 镜像来安装的!
3.1:运行容器的命令如下:
docker run -d --hostname Log --restart=always --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=log_user -e RABBITMQ_DEFAULT_PASS=331QQFEG123 rabbitmq:3-management
4:RabbitMq的使用场景主要有哪些,啥时候用或者不用?
4.1什么时候使用MQ?
1)数据驱动的任务依赖
2)上游不关心多下游执行结果
3)异步返回执行时间长
4.2什么时候不使用MQ?
需要实时关注执行结果 (eg:同步调用)
5:具体C#怎么使用RabbitMq?下面直接上code和测试截图了(Demo环境是.NetCore3.1控制台+Docker上的RabbitMQ容器来进行的)
6:sample模式,就是简单地队列模式,一进一出的效果差不多,测试截图:
Code:
//简单生产端 ui调用者
using System;
namespace RabbitMqPublishDemo
{
using MyRabbitMqService;
using System.Runtime.CompilerServices;
class Program
{
static void Main(string[] args)
{
//就是简单的队列,生产者
Console.WriteLine("====RabbitMqPublishDemo====");
for (int i = 0; i < 500; i++)
{
ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}");
}
Console.WriteLine("生成完毕!");
Console.ReadLine();
}
}
}
/// <summary>
/// 简单生产者 逻辑
/// </summary>
/// <param name="queueName"></param>
/// <param name="msg"></param>
public static void PublishSampleMsg(string queueName, string msg)
{
using (IConnection conn = connectionFactory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var msgBody = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody);
}
}
}
//简单消费端
using System;
namespace RabbitMqConsumerDemo
{
using MyRabbitMqService;
using System.Runtime.InteropServices;
class Program
{
static void Main(string[] args)
{
Console.WriteLine("====RabbitMqConsumerDemo====");
ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr =>
{
Console.WriteLine($"订阅到消息:{DateTime.Now}:{handleMsgStr}");
});
Console.ReadLine();
}
}
}
#region 简单生产者后端逻辑
/// <summary>
/// 简单消费者
/// </summary>
/// <param name="queueName">队列名称</param>
/// <param name="isBasicNack">失败后是否自动放到队列</param>
/// <param name="handleMsgStr">有就自己对字符串的处理,如果要存储到数据库请自行扩展</param>
public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false,
{
Console.WriteLine("ConsumeSampleMsg Waiting for messages....");
IConnection conn = connectionFactory.CreateConnection();
IModel channel = conn.CreateModel();
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, ea) =>
{
byte[] bymsg = ea.Body.ToArray();
string msg = Encoding.UTF8.GetString(bymsg);
if (handleMsgStr != null)
{
handleMsgStr.Invoke(msg);
}
else
{
Console.WriteLine($"{DateTime.Now}->收到消息:{msg}");
}
};
channel.BasicConsume(queueName, autoAck: true, consumer);
}
#endregion
7:Work模式
//简单生产端 ui调用者
using System;
namespace RabbitMqPublishDemo
{
using MyRabbitMqService;
using System.Runtime.CompilerServices;
class Program
{
static void Main(string[] args)
{
//就是简单的队列,生产者
Console.WriteLine("====RabbitMqPublishDemo====");
for (int i = 0; i < 500; i++)
{
ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}");
}
Console.WriteLine("生成完毕!");
Console.ReadLine();
}
}
}
/// <summary>
/// 简单生产者 逻辑
/// </summary>
/// <param name="queueName"></param>
/// <param name="msg"></param>
public static void PublishSampleMsg(string queueName, string msg)
{
using (IConnection conn = connectionFactory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var msgBody = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody);
}
}
}
//简单消费端
using System;
namespace RabbitMqConsumerDemo
{
using MyRabbitMqService;
using System.Runtime.InteropServices;
class Program
{
static void Main(string[] args)
{
Console.WriteLine("====RabbitMqConsumerDemo====");
ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr =>
{
Console.WriteLine($"订阅到消息:{DateTime.Now}:{handleMsgStr}");
});
Console.ReadLine();
}
}
}
#region 简单生产者后端逻辑
/// <summary>
/// 简单消费者
/// </summary>
/// <param name="queueName">队列名称</param>
/// <param name="isBasicNack">失败后是否自动放到队列</param>
/// <param name="handleMsgStr">有就自己对字符串的处理,如果要存储到数据库请自行扩展</param>
public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false,
{
Console.WriteLine("ConsumeSampleMsg Waiting for messages....");
IConnection conn = connectionFactory.CreateConnection();
IModel channel = conn.CreateModel();
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, ea) =>
{
byte[] bymsg = ea.Body.ToArray();
string msg = Encoding.UTF8.GetString(bymsg);
if (handleMsgStr != null)
{
handleMsgStr.Invoke(msg);
}
else
{
Console.WriteLine($"{DateTime.Now}->收到消息:{msg}");
}
};
channel.BasicConsume(queueName, autoAck: true, consumer);
}
#endregion
8:Fanout
Code:
//就如下的code, 多次生产,3个消费者都可以自动开始消费
//生产者
using System;
namespace RabbitMqPublishDemo
{
using MyRabbitMqService;
using System.Runtime.CompilerServices;
class Program
{
static void Main(string[] args)
{
for (int i = 0; i < 500; i++)
{
ZrfRabbitMqHelper.PublishWorkQueueModel("workqueue", $" :发布消息成功{i}");
}
Console.WriteLine("工作队列模式 生成完毕......!");
Console.ReadLine();
}
}
}
//生产者后端逻辑
public static void PublishWorkQueueModel(string queueName, string msg)
{
using (var connection = connectionFactory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
var body = Encoding.UTF8.GetBytes(msg);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: properties, body: body);
Console.WriteLine($"{DateTime.Now},SentMsg: {msg}");
}
}
//work消费端
using System;
namespace RabbitMqConsumerDemo
{
using MyRabbitMqService;
using System.Runtime.InteropServices;
class Program
{
static void Main(string[] args)
{
Console.WriteLine("====Work模式开启了====");
ZrfRabbitMqHelper.ConsumeWorkQueueModel("workqueue", handserMsg: msg =>
{
Console.WriteLine($"work模式获取到消息{msg}");
});
Console.ReadLine();
}
}
}
//work后端逻辑
public static void ConsumeWorkQueueModel(string queueName, int sleepHmao = 90, Action<string> handserMsg = null)
{
var connection = connectionFactory.CreateConnection();
var channel = connection.CreateModel();
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
Console.WriteLine(" ConsumeWorkQueueModel Waiting for messages....");
consumer.Received += (sender, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
if (handserMsg != null)
{
if (!string.IsNullOrEmpty(message))
{
handserMsg.Invoke(message);
}
}
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}
9:Direct
Code:
//同一个消息会被多个订阅者消费
//发布者
using System;
namespace RabbitMqPublishDemo
{
using MyRabbitMqService;
using System.Runtime.CompilerServices;
class Program
{
static void Main(string[] args)
{
#region 发布订阅模式,带上了exchange
for (int i = 0; i < 500; i++)
{
ZrfRabbitMqHelper.PublishExchangeModel("exchangemodel", $"发布的消息是:{i}");
}
Console.WriteLine("发布ok!");
#endregion
Console.ReadLine();
}
}
}
//发布者的后端逻辑 我在这里选择了扇形: ExchangeType.Fanout
public static void PublishExchangeModel(string exchangeName, string message)
{
using (var connection = connectionFactory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);
Console.WriteLine($" Sent {message}");
}
}
//订阅者
using System;
namespace RabbitMqConsumerDemo
{
using MyRabbitMqService;
using System.Runtime.InteropServices;
class Program
{
static void Main(string[] args)
{
#region 发布订阅模式 Exchange
ZrfRabbitMqHelper.SubscriberExchangeModel("exchangemodel", msg =>
{
Console.WriteLine($"订阅到消息:{msg}");
});
#endregion
Console.ReadLine();
}
}
}
//订阅者后端的逻辑
public static void SubscriberExchangeModel(string exchangeName, Action<string> handlerMsg = null)
{
var connection = connectionFactory.CreateConnection();
var channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);//Fanout 扇形分叉
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,
exchange: exchangeName,
routingKey: "");
Console.WriteLine(" Waiting for msg....");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
if (handlerMsg != null)
{
if (!string.IsNullOrEmpty(message))
{
handlerMsg.Invoke(message);
}
}
else
{
Console.WriteLine($"订阅到消息:{message}");
}
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
}
来源:https://www.cnblogs.com/Fengge518/p/13826983.html
猜你喜欢
- 以下这段C#代码实现的功能是在屏幕上画图的效果!具体代码如下://DllImport所在的名字空间 using System.Ru
- Oracle 数据库,查询增加RowBounds限制查询条数,默认是0到1000条private final static int rowL
- synchronized原理在java中,每一个对象有且仅有一个同步锁。这也意味着,同步锁是依赖于对象而存在。当我们调用某对象的synchr
- Java 多线程方法详解startstart方法 启动线程 在start方法中调用start0方法,而start0是一个
- 本文实例为大家分享了Unity快速生成常用文件夹的具体代码,供大家参考,具体内容如下前言每次打开新工程创建文件夹都很麻烦,写了一个小工具代码
- JSONArray删除元素的两种方式我自个磨出来的,难受JSONArray jsonarray = new JSONArray();Set&
- 前言:根据ThreadPoolExecutor的构造方法,JDK提供了很多工厂方法来创建各种用途的线程池.1 newFixedThreadP
- 一、项目简述功能包括:用户分为宠物,医生,管理员,宠物主人可进行注册选择医生挂号,选择日期,选择号源,医生可进行宠物接诊,管理员可对宠物,医
- 新建一个集合List<Bill> billList = new ArrayList<>();将订单中所有物品的名称提
- 1 前言敏感词过滤就是你在项目中输入某些字(比如输入xxoo相关的文字时)时要能检测出来,很多项目中都会有一个敏感词管理模块,在敏感词管理模
- jsoup 是一款 Java 的 HTML 解析器,可直接解析某个 URL 地址、HTML 文本内容。它提供了一套非常省力的 API,可通过
- 前言先简单介绍下我们的使用场景,线上5台Broker节点的kafka承接了所有binlog订阅的数据,用于Flink组件接收数据做数据中台的
- 前言序列化想必大家都很熟悉了,对象在进行网络传输过程中,需要序列化之后才能传输到客户端,或者客户端的数据序列化之后送达到服务端序列化的标准解
- c#调用surfer软件,添加应用的方法:1.在项目文件上右键->添加引用2.选择COM标签页3.找到Surfer 9 type li
- 可扩展标记语言(XML)文件是一种标准的文本文件,它使用特定的标记来描述文档的结构以及其他特性。通过将XML转换为PDF,能够便于文件传输及
- 网上的文章基本上都是只有多数据源或只有动态数据源,而最近的项目需要同时使用两种方式,记录一下配置方法供大家参考。应用场景项目需要同时连接两个
- 新建一个类MyPageInterceptor.java(注意在springboot中要添加注解@Component)package com.
- 前言在实际开发中我们经常会与时间打交道,那这就会涉及到一个时间格式转换的问题。接下来会介绍几种在SpirngBoot中如何对时间格式进行转换
- 代码思路:想要循环遍历文件夹下所有子文件夹,就要用到递归。首先判断路径是否存在:是:获取文件判断是否文件夹:是:调用自身,继续获取子文件夹下
- 前言Stream是一个来自数据源的元素队列并支持聚合操作,其中具有以下特性:Stream只负责计算,不存储任何元素,元素是特定类型的对象,形