RabbitMQ消息中间件示例详解
作者:-Finley- 发布时间:2021-12-10 23:09:58
前言
RabbitMQ 是使用 Erlang 语言开发的消息中间件, 其遵循了高级消息队列协议(Advanced Message Queuing Protocol, AMQP)。
与 Kafka 等消息队列相比,RabbitMQ 最大的优势在于其较高的可靠性:
提供确认(ACK)和重传机制保证消息完成消费, 消费者异常不会导致消息丢失
提供消息持久化机制, broker 崩溃不会导致消息丢失
集群模式下工作, 保证高可用
因为具有较高可靠性和一致性, RabbitMQ 可以胜任订单处理、秒杀等一致性要求较高的业务场景。
RabbitMQ 概念与机制
RabbitMQ 中的概念模型:
Broker: 消息中间件实例, 可能是单个节点也可能是运行在多节点集群上的逻辑实体
消息(Message): 消息由消息头和消息体两部分组成。消息头中包括routing-key、priority等标准消息头以及其它自定义消息头,用于定义RabbitMQ对消息行为。消息体是字节流,包含消息内容。
连接(Connection): 客户端与 Broker 之间的 TCP连接
信道(Channel): Channel 是建立在 TCP 连接上的逻辑(虚拟)连接。多个 Channel 复用同一个 TCP 连接, 以避免建立 TCP 连接的巨大开销。 RabbitMQ 官方要求每个线程使用独立的 Channel, 禁止多个线程共用 Channel。
生产者(Publisher): 发送消息的客户端线程
消费者(Consumer): 处理消息的客户端线程
交换机(Exchange): 交换机负责将消息投递到相应的队列
队列(Queue): 接收并保存交换机投递的消息,直至被消费者成功消费。逻辑结构遵循先进先出FIFO。
绑定(Binding): 将队列(Queue)注册到交换机(Exchange)的路由表
虚拟主机(Vhost): 每个Broker下可建立多个vhost, 每个 vhost 可建立独立的 Exchange、Queue、绑定及权限系统。同一个 Broker 下的 vhost 共享 Connection、Channel 和 用户系统,就是说可以使用同一个用户身份使用同一个 Channel 访问不同 vhost。
交换机(Exchange)
生产者发送的消息会首先送到交换机(Exchange), 交换机根据自身类型和消息的 routing-key 等信息将消息投递到绑定的消息队列中。
RabbitMQ中的四种标准交换机:
direct: 如果消息的 routing-key 与队列的 binding-key 完全相同,direct类型的交换机则会将消息投递到该队列中。
多个队列可以使用相同的 binding-key 绑定到同一个 direct 交换机,direct 交换机会把消息投递到所有 binding-key 与消息 routing-key 相同的队列
topic: 允许队列的 binding-key 中包含通配符*和#, topic 交换机会将消息投递到 binding-key 与 routing-key 匹配的队列中。
通配符按照关键字进行匹配,如news.cn.a中的关键字是news、cn和a,即关键字按照.分割
#通配符匹配0个或多个关键字, news.#.a可以匹配news.a, news.cn.a和news.asia.cn.a等
*通配符匹配一个关键字, news.*.a匹配news.cn.a不匹配news.a、news.asia.cn.a
fanout: fanout 交换机不进行任何匹配, 将消息投递到所有绑定的队列
header: header 交换机根据消息头进行投递,现在已较少使用
我们可以使用 RabbitMQ 的插件机制使用第三方交换机或自行开发交换机。如实现延时投递的delayed-message-exchange。
消息头中的delivery-mode可以设置为 persistent(持久化) 或者 transient(易失)。 Exchange 和 Queue 在处理持久化的消息时都会先将消息写入磁盘中再进行下一步处理, 即使 RabbitMQ 崩溃也不会丢失。
消费者客户端通常使用的channel.basicConsume使用推(push)模式投递消息, 即当有新消息时 Broker 通过 channel 主动向客户端发送消息。客户端也可以使用channel.basicGet从 Broker 拉取消息。
ACK机制
RabbitMQ 提供了确认送达(acknowledge)机制保证消息被正确处理不会丢失。
确认送达的回执有三种:
ACK: 消息已被成功处理
NACK: 消息处理异常, 需要重新投递
REJECT: 消息非法, 丢弃消息
RabbitMQ 的 Queue 可以设置 no_ack=true, 则消息被投递后即删除不等待回执。
channel.basicConsume 可以指定auto_ack模式,若auto_ack=true当客户端收到完整消息后即会自动发出ACK回执,否则必须显式的发出回执。
Java 代码示例
首先安装并启动RabbitMQ实例, Mac用户可以使用 Homebrew 进行安装:
brew install rabbitmq
启动服务:
brew services start rabbitmq
或者使用官方docker镜像:
docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3-management
RabbitMQ官网提供了Ubuntu、RPM以及Windows等多种平台安装方式。
RabbitMQ默认TCP端口为5672, Web控制台默认端口15672。
在Maven中添加依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.1</version>
</dependency>
编写生产者:
package rabbit;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author finley
*/
public class RabbitProducer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
try (Connection conn = factory.newConnection();
Channel channel = conn.createChannel()) {
String exchangeName = "test-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
String routingKey = "hello";
byte[] msg = "hello world".getBytes();
AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder();
propsBuilder.deliveryMode(2); // persistent
propsBuilder.priority(0); // normal
propsBuilder.contentType("text/plain");
channel.basicPublish(exchangeName, routingKey, propsBuilder.build(), msg);
}
}
}
编写消费者:
package rabbit;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.*;
/**
* @author finley
*/
public class RabbitConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
try (Connection conn = factory.newConnection();
Channel channel = conn.createChannel()) {
String exchangeName = "test-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
String bindingKey = "hello";
channel.queueBind(queueName, exchangeName, bindingKey);
while(true) {
channel.basicConsume(queueName, false, "", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
String bodyStr = new String(body, "UTF-8");
System.out.println("routingKey: " + routingKey + ", contentType: " + contentType + ", body: " + bodyStr);
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);
}
});
}
}
}
}
RabbitMQ 的消息为字节, 可以将 Java 对象序列化后作为消息体发送。
来源:https://www.cnblogs.com/Finley/p/10126315.html
猜你喜欢
- JavaWeb登录界面登录失败在同一页面进行提示方法使用JSP 通过提交表单方式 判断账号密码是否正确 不正确则调用req.setAttri
- 方法一(推荐):设置datetimepicker的属性ShowCheckBox为true在窗口初始化时候,添加代码this.datetime
- 1.user实体package com.demo.dto;public class User { private Integer
- 一、概述本文描述采用C#语言访问控制Tektronix TBS 2000B 系列示波器。接口协议采用NI-VISA。最近一个项目需要和一款示
- using System;using System.Collections.Generic;using System.ComponentMo
- 骑缝章是用于往来业务合同,以确保合同真实、有效的印章加盖方法,是一种防范风险的重要方式。在Java程序中,可以通过使用工具来辅助加盖这种骑缝
- 本文实例讲述了在WPF中动态加载XAML中的控件的方法。分享给大家供大家参考,具体如下:using System;using System.
- 使用对象初始值设定项初始化对象可以使用对象初始值设定项以声明方式初始化类型对象,而无需显式调用类型的构造函数。下面的示例演示如何将对象初始值
- 下面对Java反射的基础知识进行说明和总结:首先定义一个MyBase类,其中有私有字段,也有公有字段。同时也有公有方法和私有方法。MyBas
- 前言easyui是一种基于jQuery的用户界面插件集合。easyui为创建现代化,互动,JavaScript应用程序,提供必要的功能。使用
- 实际上,HashSet 和 HashMap 之间有很多相似之处,对于 HashSet 而言,系统采用 Hash 算法决定集合元素的存储位置,
- idea pom文件图标不对今天遇到一个奇怪的现象,如下图原先pom的图标应该是有个m的,现在直接变成了xml的文件了。右边的Maven P
- 一 前言最近网上比较火的代码生成器,知识追寻者抽空试试了一下,感觉不是友好,只能说功能比较呆板吧,还需要自己玩填空题,修修补补,然后再次打开
- 先对问题进行描述:(1)在MyEclipse Servers视图在MyEclipse 10.7.0在某些情况下,可能无法打开。 试图打开视图
- 概述:Flutter中常用的滑动布局 ScrollView 有 SingleChildScrollView、NestedScrollView
- 首先项目A,也就是SpringBOOT项目中使用redisTemplate 来做REDIS的缓存时,你会发现存到REDIS里边的KEY和VA
- 背景环境需要设置代理才能够访问外部网络,如果只是运行java程序来访问网络,我们可以通过java -jar test.jar -Dproxy
- Contact联系人对Mms来说是十分重要的,因为每一个对话的收信人都是一个联系人,新建信息时可以输入联系人的任何信息,比如号码或名字,Mm
- springboot项目main函数启动在controller包下新建appController类package controller;im
- Android植物大战僵尸小游戏全部内容如下:相关下载:Android植物大战僵尸小游戏具体代码如下所示:package com.examp