软件编程
位置:首页>> 软件编程>> C#编程>> C#用RabbitMQ实现消息订阅与发布

C#用RabbitMQ实现消息订阅与发布

作者:Alan.hsiang  发布时间:2022-09-05 16:23:40 

标签:C#,RabbitMQ,消息订阅,消息发布
目录
  • Fanout交换机模型

  • RabbitMQ控制台操作

    • 新增两个队列

    • 绑定fanout交换机

  • 示例效果图

    • 核心代码

      • 消息发布

      • 消息订阅

    Fanout交换机模型

    扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。

    C#用RabbitMQ实现消息订阅与发布

    RabbitMQ控制台操作

    新增两个队列

    在同一个Virtual host下新增两个队列Q1,Q2,如下图所示:

    C#用RabbitMQ实现消息订阅与发布

    绑定fanout交换机

    将两个队列绑定到系统默认的fanout交换机,如下所示:

    C#用RabbitMQ实现消息订阅与发布

    示例效果图

    生产者,采用Fanout类型交换机发布消息,如下图所示:

    C#用RabbitMQ实现消息订阅与发布

     当生产者发布 一条消息时,Q1,Q2两个队列均会收到,如下图所示:

    C#用RabbitMQ实现消息订阅与发布

    当启动消费者后,两个消费者,均会订阅到相关消息,如下图所示:

    C#用RabbitMQ实现消息订阅与发布

    核心代码

    消息发布

    建立连接后,将通道声明类型为Fanout的交换机,如下所示:


    /// <summary>
       /// fanout类型交换机,发送消息
       /// </summary>
       public class RabbitMqFanoutSendHelper : RabbitMqHelper {
           /// <summary>
           /// 发送消息
           /// </summary>
           /// <param name="msg"></param>
           /// <returns></returns>
           public bool SendMsg(string msg)
           {
               try
               {
                   using (var conn = GetConnection("/Alan.hsiang"))
                   {
                       using (var channel = conn.CreateModel())
                       {
                           channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);

    var body = Encoding.UTF8.GetBytes(msg);

    channel.BasicPublish(exchange: "amq.fanout",
                                                routingKey: "",
                                                basicProperties: null,
                                                body: body);

    //Console.WriteLine(" [x] Sent {0}", message);
                       };
                   };
                   return true;
               }
               catch (Exception ex)
               {
                   throw ex;
               }
           }
       }

    消息订阅

    建立连接后,通道声明类型为Fanout的交换机,并绑定队列进行订阅,如下所示:


    /// <summary>
       /// 扇形交换机接收消息
       /// </summary>
       public class RabbitMqFanoutReceiveHelper : RabbitMqHelper
       {
           public RabbitMqReceiveEventHandler OnReceiveEvent;

    private IConnection conn;

    private IModel channel;

    private EventingBasicConsumer consumer;

    public bool StartReceiveMsg(string queueName)
           {
               try
               {
                   conn = GetConnection("/Alan.hsiang");

    channel = conn.CreateModel();
                   channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);
                   //此处随机取出交换机下的队列
                   //var queueName = channel.QueueDeclare().QueueName;
                   channel.QueueBind(queue: queueName, exchange: "amq.fanout", routingKey: "");
                   consumer = new EventingBasicConsumer(channel);
                   consumer.Received += (model, ea) =>
                   {
                       var body = ea.Body.ToArray();
                       var message = Encoding.UTF8.GetString(body);
                       //Console.WriteLine(" [x] Received {0}", message);
                       if (OnReceiveEvent != null)
                       {
                           OnReceiveEvent(queueName+"::"+message);
                       }
                   };
                   channel.BasicConsume(queue: queueName,
                                           autoAck: true,
                                           consumer: consumer);
                   return true;
               }
               catch (Exception ex)
               {
                   throw ex;
               }
           }
       }

    作者:Alan.hsiang
    出处:http://www.cnblogs.com/hsiang/

    来源:https://www.cnblogs.com/hsiang/p/14771629.html

    0
    投稿

    猜你喜欢

    手机版 软件编程 asp之家 www.aspxhome.com