C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)
作者:做自己518 发布时间:2023-12-06 10:45:34
1:RabbitMQ是个啥?(专业术语参考自网络)
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。
RabbitMQ服务器是用Erlang语言编写的,Erlang是专门为高并发而生的语言,而集群和故障转移是构建在开发电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库
2:使用RabbitMQ有啥好处?
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RabbitMQ的可靠性是非常好的,数据能够保证百分之百的不丢失。可以使用镜像队列,它的稳定性非常好。所以说在我们互联网的金融行业。
对数据的稳定性和可靠性要求都非常高的情况下,我们都会选择RabbitMQ。当然没有kafka性能好,但是要比AvtiveMQ性能要好很多。也可以自己做一些性能的优化。
RabbitMQ可以构建异地双活架构,包括每一个节点存储方式可以采用磁盘或者内存的方式,
3:RabbitMq的安装以及环境搭建等:
网络上有很多关于怎么搭建配置RabbitMq服务环境的详细文章,也比较简单,这里不再说明,本人是Docker上面的pull RabbitMq 镜像来安装的!
3.1:运行容器的命令如下:
docker run -d --hostname Log --restart=always --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=log_user -e RABBITMQ_DEFAULT_PASS=331QQFEG123 rabbitmq:3-management
4:RabbitMq的使用场景主要有哪些,啥时候用或者不用?
4.1什么时候使用MQ?
1)数据驱动的任务依赖
2)上游不关心多下游执行结果
3)异步返回执行时间长
4.2什么时候不使用MQ?
需要实时关注执行结果 (eg:同步调用)
5:具体C#怎么使用RabbitMq?下面直接上code和测试截图了(Demo环境是.NetCore3.1控制台+Docker上的RabbitMQ容器来进行的)
6:sample模式,就是简单地队列模式,一进一出的效果差不多,测试截图:
Code:
//简单生产端 ui调用者
using System;
namespace RabbitMqPublishDemo
{
using MyRabbitMqService;
using System.Runtime.CompilerServices;
class Program
{
static void Main(string[] args)
{
//就是简单的队列,生产者
Console.WriteLine("====RabbitMqPublishDemo====");
for (int i = 0; i < 500; i++)
{
ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}");
}
Console.WriteLine("生成完毕!");
Console.ReadLine();
}
}
}
/// <summary>
/// 简单生产者 逻辑
/// </summary>
/// <param name="queueName"></param>
/// <param name="msg"></param>
public static void PublishSampleMsg(string queueName, string msg)
{
using (IConnection conn = connectionFactory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var msgBody = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody);
}
}
}
//简单消费端
using System;
namespace RabbitMqConsumerDemo
{
using MyRabbitMqService;
using System.Runtime.InteropServices;
class Program
{
static void Main(string[] args)
{
Console.WriteLine("====RabbitMqConsumerDemo====");
ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr =>
{
Console.WriteLine($"订阅到消息:{DateTime.Now}:{handleMsgStr}");
});
Console.ReadLine();
}
}
}
#region 简单生产者后端逻辑
/// <summary>
/// 简单消费者
/// </summary>
/// <param name="queueName">队列名称</param>
/// <param name="isBasicNack">失败后是否自动放到队列</param>
/// <param name="handleMsgStr">有就自己对字符串的处理,如果要存储到数据库请自行扩展</param>
public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false,
{
Console.WriteLine("ConsumeSampleMsg Waiting for messages....");
IConnection conn = connectionFactory.CreateConnection();
IModel channel = conn.CreateModel();
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, ea) =>
{
byte[] bymsg = ea.Body.ToArray();
string msg = Encoding.UTF8.GetString(bymsg);
if (handleMsgStr != null)
{
handleMsgStr.Invoke(msg);
}
else
{
Console.WriteLine($"{DateTime.Now}->收到消息:{msg}");
}
};
channel.BasicConsume(queueName, autoAck: true, consumer);
}
#endregion
7:Work模式
//简单生产端 ui调用者
using System;
namespace RabbitMqPublishDemo
{
using MyRabbitMqService;
using System.Runtime.CompilerServices;
class Program
{
static void Main(string[] args)
{
//就是简单的队列,生产者
Console.WriteLine("====RabbitMqPublishDemo====");
for (int i = 0; i < 500; i++)
{
ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}");
}
Console.WriteLine("生成完毕!");
Console.ReadLine();
}
}
}
/// <summary>
/// 简单生产者 逻辑
/// </summary>
/// <param name="queueName"></param>
/// <param name="msg"></param>
public static void PublishSampleMsg(string queueName, string msg)
{
using (IConnection conn = connectionFactory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var msgBody = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody);
}
}
}
//简单消费端
using System;
namespace RabbitMqConsumerDemo
{
using MyRabbitMqService;
using System.Runtime.InteropServices;
class Program
{
static void Main(string[] args)
{
Console.WriteLine("====RabbitMqConsumerDemo====");
ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr =>
{
Console.WriteLine($"订阅到消息:{DateTime.Now}:{handleMsgStr}");
});
Console.ReadLine();
}
}
}
#region 简单生产者后端逻辑
/// <summary>
/// 简单消费者
/// </summary>
/// <param name="queueName">队列名称</param>
/// <param name="isBasicNack">失败后是否自动放到队列</param>
/// <param name="handleMsgStr">有就自己对字符串的处理,如果要存储到数据库请自行扩展</param>
public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false,
{
Console.WriteLine("ConsumeSampleMsg Waiting for messages....");
IConnection conn = connectionFactory.CreateConnection();
IModel channel = conn.CreateModel();
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, ea) =>
{
byte[] bymsg = ea.Body.ToArray();
string msg = Encoding.UTF8.GetString(bymsg);
if (handleMsgStr != null)
{
handleMsgStr.Invoke(msg);
}
else
{
Console.WriteLine($"{DateTime.Now}->收到消息:{msg}");
}
};
channel.BasicConsume(queueName, autoAck: true, consumer);
}
#endregion
8:Fanout
Code:
//就如下的code, 多次生产,3个消费者都可以自动开始消费
//生产者
using System;
namespace RabbitMqPublishDemo
{
using MyRabbitMqService;
using System.Runtime.CompilerServices;
class Program
{
static void Main(string[] args)
{
for (int i = 0; i < 500; i++)
{
ZrfRabbitMqHelper.PublishWorkQueueModel("workqueue", $" :发布消息成功{i}");
}
Console.WriteLine("工作队列模式 生成完毕......!");
Console.ReadLine();
}
}
}
//生产者后端逻辑
public static void PublishWorkQueueModel(string queueName, string msg)
{
using (var connection = connectionFactory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
var body = Encoding.UTF8.GetBytes(msg);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: properties, body: body);
Console.WriteLine($"{DateTime.Now},SentMsg: {msg}");
}
}
//work消费端
using System;
namespace RabbitMqConsumerDemo
{
using MyRabbitMqService;
using System.Runtime.InteropServices;
class Program
{
static void Main(string[] args)
{
Console.WriteLine("====Work模式开启了====");
ZrfRabbitMqHelper.ConsumeWorkQueueModel("workqueue", handserMsg: msg =>
{
Console.WriteLine($"work模式获取到消息{msg}");
});
Console.ReadLine();
}
}
}
//work后端逻辑
public static void ConsumeWorkQueueModel(string queueName, int sleepHmao = 90, Action<string> handserMsg = null)
{
var connection = connectionFactory.CreateConnection();
var channel = connection.CreateModel();
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
Console.WriteLine(" ConsumeWorkQueueModel Waiting for messages....");
consumer.Received += (sender, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
if (handserMsg != null)
{
if (!string.IsNullOrEmpty(message))
{
handserMsg.Invoke(message);
}
}
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}
9:Direct
Code:
//同一个消息会被多个订阅者消费
//发布者
using System;
namespace RabbitMqPublishDemo
{
using MyRabbitMqService;
using System.Runtime.CompilerServices;
class Program
{
static void Main(string[] args)
{
#region 发布订阅模式,带上了exchange
for (int i = 0; i < 500; i++)
{
ZrfRabbitMqHelper.PublishExchangeModel("exchangemodel", $"发布的消息是:{i}");
}
Console.WriteLine("发布ok!");
#endregion
Console.ReadLine();
}
}
}
//发布者的后端逻辑 我在这里选择了扇形: ExchangeType.Fanout
public static void PublishExchangeModel(string exchangeName, string message)
{
using (var connection = connectionFactory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);
Console.WriteLine($" Sent {message}");
}
}
//订阅者
using System;
namespace RabbitMqConsumerDemo
{
using MyRabbitMqService;
using System.Runtime.InteropServices;
class Program
{
static void Main(string[] args)
{
#region 发布订阅模式 Exchange
ZrfRabbitMqHelper.SubscriberExchangeModel("exchangemodel", msg =>
{
Console.WriteLine($"订阅到消息:{msg}");
});
#endregion
Console.ReadLine();
}
}
}
//订阅者后端的逻辑
public static void SubscriberExchangeModel(string exchangeName, Action<string> handlerMsg = null)
{
var connection = connectionFactory.CreateConnection();
var channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);//Fanout 扇形分叉
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,
exchange: exchangeName,
routingKey: "");
Console.WriteLine(" Waiting for msg....");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
if (handlerMsg != null)
{
if (!string.IsNullOrEmpty(message))
{
handlerMsg.Invoke(message);
}
}
else
{
Console.WriteLine($"订阅到消息:{message}");
}
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
}
来源:https://www.cnblogs.com/Fengge518/p/13826983.html


猜你喜欢
- 本文实例为大家分享了Java swing读取txt文件实现学生考试系统的具体代码,供大家参考,具体内容如下主要实现了一个简单的倒计时答题系统
- Rmb.javapublic class Rmb { /** *人民币的基本信息和操作 *@auth
- @ConditionalOnMissingBean,它是修饰bean的一个注解,主要实现的是,当你的bean被注册之后,如果而注册相同类型的
- Java多线程下载网图案例此案例依赖——文件操作工具类(FileUtils)使用 apache 的commons-io包下的FileUtil
- springboot整合jwt步骤:1、登录时,验证账号和密码成功后,生成jwt,返回给前端;2、前端接收后保存,再做其他操作,比如增删改查
- Kotlin 基础教程之类、对象、接口Kotlin中类、接口相关概念与Java一样,包括类名、属性、方法、继承等,如下示例:interfac
- 工欲善其事,必先利其器很多程序员可能都忘了记录应用程序的行为是一件多么重要的事,当遇到多线程环境下高压力导致的并发bug时,你就能体会到记录
- 定义工具类-创建对应的日志对象定义枚举类-存储定义的日志文件名称logback.xml里配置对应的日志名称和日志等级1、工具类 Logger
- 这篇文章主要介绍了Java中遍历ConcurrentHashMap的四种方式详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一
- 一、自定义菜单的说明和按钮类型1、菜单说明1)自定义菜单最多包括3个一级菜单,每个一级菜单最多包含5个二级菜单。2)一级菜单最多4个汉字,二
- 前言本篇文章 中写到的是 flutter 调用了Android 原生的 TextView 案例添加原生组件的流程基本上可以描述为:1 and
- 原理比较简单,引入System.Reflection命名空间,利用反射查看某种Type下的方法,属性,字段和支持的接口等。using Sys
- 我就废话不多说了,大家还是直接看代码吧~//returnContent为获取到的返回参数System.out.println(returnC
- 本文实例为大家分享了OpenCV计算图像的水平和垂直积分投影的具体代码,供大家参考,具体内容如下#include <cv.h>
- 根据中国的国情,宽带共享遭受dns污染和HTTP拦截非常严重,造成网络请求的不稳定.但是ip/tcp协议一般不受影响。因此可以把域名先解析成
- 对谷歌地图操作使用的是WebBrowser控件,通过对javascript的操作来实现对谷歌地图的各种操作,所以首先要创建一个html文件,
- 1、直接使用getWindow().getDecorView().getRootView()直接使用getWindow().getDecor
- 本文实例为大家分享了java实现微信红包的具体代码,供大家参考,具体内容如下要求基于BigDecimal类实现微信红包算法的功能,比如设置红
- 本文实例讲解了统计文本中某个字符串出现的次数或字符串中指定元素出现的次数方法,分享给大家供大家参考,具体内容如下运行效果图:程序查找的上此文
- Java xml出现错误 javax.xml.transform.TransformerException: java.lang.NullP