C#用RabbitMQ实现消息订阅与发布
作者:Alan.hsiang 发布时间:2022-09-05 16:23:40
标签:C#,RabbitMQ,消息订阅,消息发布
目录
Fanout交换机模型
RabbitMQ控制台操作
新增两个队列
绑定fanout交换机
示例效果图
核心代码
消息发布
消息订阅
Fanout交换机模型
扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
RabbitMQ控制台操作
新增两个队列
在同一个Virtual host下新增两个队列Q1,Q2,如下图所示:
绑定fanout交换机
将两个队列绑定到系统默认的fanout交换机,如下所示:
示例效果图
生产者,采用Fanout类型交换机发布消息,如下图所示:
当生产者发布 一条消息时,Q1,Q2两个队列均会收到,如下图所示:
当启动消费者后,两个消费者,均会订阅到相关消息,如下图所示:
核心代码
消息发布
建立连接后,将通道声明类型为Fanout的交换机,如下所示:
/// <summary>
/// fanout类型交换机,发送消息
/// </summary>
public class RabbitMqFanoutSendHelper : RabbitMqHelper {
/// <summary>
/// 发送消息
/// </summary>
/// <param name="msg"></param>
/// <returns></returns>
public bool SendMsg(string msg)
{
try
{
using (var conn = GetConnection("/Alan.hsiang"))
{
using (var channel = conn.CreateModel())
{
channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);
var body = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: "amq.fanout",
routingKey: "",
basicProperties: null,
body: body);
//Console.WriteLine(" [x] Sent {0}", message);
};
};
return true;
}
catch (Exception ex)
{
throw ex;
}
}
}
消息订阅
建立连接后,通道声明类型为Fanout的交换机,并绑定队列进行订阅,如下所示:
/// <summary>
/// 扇形交换机接收消息
/// </summary>
public class RabbitMqFanoutReceiveHelper : RabbitMqHelper
{
public RabbitMqReceiveEventHandler OnReceiveEvent;
private IConnection conn;
private IModel channel;
private EventingBasicConsumer consumer;
public bool StartReceiveMsg(string queueName)
{
try
{
conn = GetConnection("/Alan.hsiang");
channel = conn.CreateModel();
channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);
//此处随机取出交换机下的队列
//var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "amq.fanout", routingKey: "");
consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
//Console.WriteLine(" [x] Received {0}", message);
if (OnReceiveEvent != null)
{
OnReceiveEvent(queueName+"::"+message);
}
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
return true;
}
catch (Exception ex)
{
throw ex;
}
}
}
作者:Alan.hsiang
出处:http://www.cnblogs.com/hsiang/
来源:https://www.cnblogs.com/hsiang/p/14771629.html


猜你喜欢
- 应用场景:在Android开发过程中,有时需要调用手机自身设备的功能,上篇文章主要侧重摄像头拍照功能的调用。本篇文章将综合实现拍照与视频的操
- 对于Android View的测量,我们一句话总结为:
- 记录一下工作流的在Springboot中的使用,,顺便写个demo,概念,什么东西的我就不解释了,如有问题欢迎各位大佬指导一下。1.创建sp
- 一、背景单机节点下,WebSocket连接成功后,可以直接发送消息。而多节点下,连接时通过nginx会代理到不同节点。假设一开始用户连接了n
- 突然需要改一堆文件的后缀名,所以想编程解决,话不多说直接上代码javaimport java.io.File;import java.uti
- 概念异常处理的概念起源于早期的编程语言,如 LISP、PL/I 和 CLU。这些编程语言首次引入了异常处理机制,以便在程序执行过程中检测和处
- 这里简单介绍一下ZXing库。ZXing是一个开放源码的,用Java实现的多种格式的1D/2D条码图像处理库,它包含了联系到其他语言的端口。
- 真实的多线程业务开发中,最常用到的逻辑就是数据的读写,ReentrantLock虽然具有完全互斥排他的效果(即同一时间只有一个线程正在执行l
- 一、延迟加载:LazyLoading使用延迟加载,关联的实体必须标注为virtual。本例是标注Destination类里的Lodgings
- 日志是非常重要的,虽然他不会以需求功能提来,但也不会体现在产品方案中。但是,它在系统项目中却占有巨大的地位。为了保证服务的高可用,发现问题一
- 本文实例讲述了Java之JFrame输出Helloworld的方法。分享给大家供大家参考。具体如下:JAVA的GUI程序的基本思路是以JFr
- Java虚拟机栈1. 定义栈:线程运行时需要的内存空间,一个栈存在多个栈帧。栈具有先入后出,后入先出的特点。栈帧:每个方法运行时需要的内存(
- 一、前言 接触面向对象的思想已经有一段时光了,为什么要学习面向对象呢
- 一、前序遍历1.题目描述给你二叉树的根节点 root ,返回它节点值的 前序 遍历。2.输入输出示例示例 1:输入:root = [1,nu
- 本文实例为大家分享了Android端实现文件上传的具体代码,供大家参考,具体内容如下1)、新建一个Android项目命名为androidUp
- 下面对Java反射的基础知识进行说明和总结:首先定义一个MyBase类,其中有私有字段,也有公有字段。同时也有公有方法和私有方法。MyBas
- 最近一个项目要导出word文档,折腾老半天,发现还是用freemarker的模板来搞比较方便省事,现总结一下关键步骤,供大家参考,这里是一个
- 1.打开idea -> file -> settings ->Plugins 搜索Free Mybatis p
- 1.View的坐标参数 主要有哪些?分别有什么注意的要点?答:Left,Right,top,Bottom 注意这4个值其实就是 view 和
- 引言上文Android:实现一个自定义有限制区域的图例(角度自识别)涂鸦工具类(中)中我们已经实现了在复杂的异形区域中涂鸦,最后生成图片保存