关于Java整合RocketMQ实现生产消费详解
作者:叫我二蛋 发布时间:2022-12-31 07:44:34
环境搭建
创建Maven项目。
pom.xml文件中引入RocketMQ依赖。
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
</dependencies>
生产者
普通消息
RocketMQ可用于以三种方式发送消息:同步、异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。
同步发送
private final static String nameServer = "127.0.0.1:9876";
private final static String producerGroup = "my_group";
private final static String topic = "topic-test";
@Test
public void syncSend() {
try {
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// 设置NameServer地址
producer.setNamesrvAddr(nameServer);
// 启动producer
producer.start();
// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 利用producer进行发送,并同步等待发送结果
SendResult sendResult = producer.send(msg, 10000);
System.out.printf("%s%n", sendResult);
// 一旦producer不再使用,关闭producer
producer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
异步发送
@Test
public void asyncSend() throws IOException {
try {
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// 设置NameServer地址
producer.setNamesrvAddr(nameServer);
// 启动producer
producer.start();
// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 异步发送消息, 发送结果通过callback返回给客户端
producer.send(msg, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.printf("OK %s %n",
sendResult.getMsgId());
}
public void onException(Throwable e) {
System.out.printf("Exception %s %n", e);
e.printStackTrace();
}
},10000);
} catch (Exception e) {
e.printStackTrace();
}
System.in.read();
}
单向传输
@Test
public void onewaySend() {
try {
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// 设置NameServer地址
producer.setNamesrvAddr(nameServer);
producer.setSendMsgTimeout(10000);
// 启动producer
producer.start();
// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 异步发送消息, 发送结果通过callback返回给客户端
producer.sendOneway(msg);
// 一旦producer不再使用,关闭producer
//producer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
顺序消息
RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。如需保证消息生产的顺序性,则必须满足以下条件:
单一生产者: 消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的分区键,不同生产者之间产生的消息也无法判定其先后顺序。
串行发送:生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。
@Test
public void orderSend() {
try {
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// 设置NameServer地址
producer.setNamesrvAddr(nameServer);
producer.setSendMsgTimeout(10000);
// 启动producer
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 10; i++) {
int orderId = i % 10;
Message msg = new Message(topic, tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
}
延迟消息
延迟消息发送是指消息发送到RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。 使用 RocketMQ 的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。 RocketMQ 一共支持18个等级的延迟投递,具体时间如下:
投递等级 | 延迟时间 | 投递等级 | 延迟时间 |
1 | 1s | 10 | 6min |
2 | 5s | 11 | 7min |
3 | 10s | 12 | 8min |
4 | 30s | 13 | 9min |
5 | 1min | 14 | 10min |
6 | 2min | 15 | 20min |
7 | 3min | 16 | 30min |
8 | 4min | 17 | 1h |
9 | 5min | 18 | 2h |
@Test
public void scheduledSend() {
try {
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// 设置NameServer地址
producer.setNamesrvAddr(nameServer);
// 启动producer
producer.start();
// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 消息延迟等级
msg.setDelayTimeLevel(2);
// 利用producer进行发送,并同步等待发送结果
SendResult sendResult = producer.send(msg, 10000);
System.out.printf("%s%n", sendResult);
// 一旦producer不再使用,关闭producer
producer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
批量消息
在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。
@Test
public void batchSend() {
try {
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// 设置NameServer地址
producer.setNamesrvAddr(nameServer);
// 启动producer
producer.start();
List<Message> messages = new ArrayList<Message>();
messages.add(new Message(topic, "Tag", "Order001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "Order002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "Order003", "Hello world 2".getBytes()));
// 利用producer进行发送,并同步等待发送结果
SendResult sendResult = producer.send(messages, 10000);
System.out.printf("%s%n", sendResult);
// 一旦producer不再使用,关闭producer
producer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
事务消息
在一些对数据一致性有强需求的场景,可以用RocketMQ 事务消息来解决,从而保证上下游数据的一致性。
基于 RocketMQ 的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。
第一阶段会发送一个半事务消息,半事务消息是指暂不能投递的消息。 如果发送成功则执行本地事务,并根据本地事务执行成功与否,向 Broker 半事务消息状态(commit或者rollback)。 半事务消息只有 commit 状态才会真正向下游投递。 如果由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker 端会通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback)。这样最终保证了本地事务执行成功,下游就能收到消息,本地事务执行失败,下游就收不到消息。
事务消息的详细交互流程如下图所示:
@Test
public void transactionSend() {
try {
// 事务消息的发送不再使用 DefaultMQProducer,而是使用 TransactionMQProducer 进行发送
TransactionMQProducer producer = new TransactionMQProducer(producerGroup);
// 设置NameServer地址
producer.setNamesrvAddr(nameServer);
// 事务回查的线程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(new TransactionListener() {
//半事务消息发送成功后,执行本地事务的方法
public LocalTransactionState executeLocalTransaction(Message msg, Object o) {
System.out.printf("执行本地事务 %n");
/*
二次确认
LocalTransactionState.COMMIT_MESSAGE:提交事务,允许消费者消费该消息
LocalTransactionState.ROLLBACK_MESSAGE:回滚事务,消息将被丢弃不允许消费。
LocalTransactionState.UNKNOW:暂时无法判断状态,等待固定时间以后Broker端根据回查规则向生产者进行消息回查。
*/
return LocalTransactionState.UNKNOW;
}
// 二次确认消息没有收到,Broker端回查事务状态的方法,默认60s
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.printf("二次确认失败,broker事务回查 %n");
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.setSendMsgTimeout(10000);
// 启动producer
producer.start();
Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 利用producer进行发送事务消息,并同步等待发送结果
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
// 一旦producer不再使用,关闭producer
System.in.read();
} catch (Exception e) {
e.printStackTrace();
}
}
消费者
MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull。
Push消费
Push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
private final static String nameServer = "127.0.0.1:9876";
private final static String consumerGroup = "my_group";
private final static String topic = "topic-test";
@Test
public void consumerPush() throws MQClientException, IOException {
// 初始化consumer,并设置consumer group name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
// 设置NameServer地址
consumer.setNamesrvAddr(nameServer);
// 订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
consumer.subscribe(topic, "*");
//设置采用广播模式,广播模式下,消费组内的每一个消费者都会消费全量消息。
//consumer.setMessageModel(MessageModel.BROADCASTING);
//注册回调接口来处理从Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 顺序消费
// consumer.registerMessageListener(new MessageListenerOrderly() {
// public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// return ConsumeOrderlyStatus.SUCCESS;
// }
// });
// 启动Consumer
consumer.start();
System.out.printf("Consumer Started.%n");
System.in.read();
}
Pull 消费
Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。
@Test
public void consumerPull() {
try {
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(consumerGroup);
consumer.setNamesrvAddr(nameServer);
//关闭自动提交
consumer.setAutoCommit(false);
consumer.subscribe(topic, "*");
consumer.setPullBatchSize(20);
consumer.start();
while (true) {
List<MessageExt> messageExts = consumer.poll();
System.out.printf("%s%n", messageExts);
consumer.commitSync();
}
} catch (Exception e) {
e.printStackTrace();
}
}
代码仓库
https://gitee.com/codeWBG/learn_rocketmq
来源:https://wangbinguang.blog.csdn.net/article/details/128452607


猜你喜欢
- 1. matlab的lp2lp函数的作用去归一化 H(s) 的分母2. matlab的lp2lp函数的使用方法[z, p, k]=butta
- Java事件处理机制java中的事件机制的参与者有3种角色:1.event object:事件状态对象,用于listener的相应的方法之中
- 前言异常崩溃,是Android项目中一项比较棘手的问题,即便做了很多的try - catch处理,也不能保证上线不会崩,而且一旦出现崩溃,就
- 1.动态绑定机制java的动态绑定机制非常重要实例A我们来看一个实例:阅读上面的代码,请说明下面的程序将输出什么结果:程序将会输出40和30
- C#之委托委托:顾名思义,让别人帮你办件事。委托是C#实现回调函数的一种机制。可能有人会问了,回调函数是个啥???举个例子:我现在是一家公司
- java返回json请求中文变成问号原来在个人项目时,用layui的数据表格获取数据时,不会出现中文变问号问题后来换了个项目,发现返回的js
- 1. JAVA源文件的命名JAVA源文件名必须和源文件中所定义的类的类名相同。2. Package的命名Package名的第一部分应是小写A
- 前段时间在写直播的时候,需要观众在看直播的时候点赞的效果,在此参照了腾讯大神写的点赞(飘心动画效果)。下面是效果图:1.自定义飘心动画的属性
- 这段时间用到了QT的TCP通信,做了初步的学习与尝试,编写了一个客户端和服务器基于窗口通信的小例程。使用QT的网络套接字需要.pro文件中加
- 本文实例讲述了C#验证码识别基础方法,是非常实用的技巧。分享给大家供大家参考。具体方法分析如下:背景最近有朋友在搞一个东西,已经做的挺不错了
- 原理是使用LinkedHashMap来实现,当缓存超过大小时,将会删除最老的一个元组。实现代码如下所示import java.util.Li
- 一、HashMap概述HashMap基于哈希表的 Map 接口的实现。此实现提供所有可选的映射操作,并允许使用 null 值和 null 键
- 总体实现思路是启动一个生产者项目注册, 将所含服务注册到zookeeper的注册中心, 然后在启动一个消费者项目,将所需服务向zookeep
- 异常的定义在java中,异常就是java在编译、运行或运行过程中出现的错误总共有三种:1.编译错误 2.运行错误 3.逻辑错误1.编译错误是
- 前言在我的申请下,公司终于购买了一台基于Android12.0的手机,然后我就开心的拿去安装测试了,发现程序崩溃了,于是我这里就写下来,An
- 前言图形相交检测常常用在伤害判定,使用自定义的图形相交检测,可以在一定程度上控制性能。比如2D格斗游戏中使用的矩形包围盒(AABB),一些动
- 这篇文章主要介绍了如何通过SpringBoot实现商城秒杀系统,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,
- 屏幕切换指的是在同一个Activity内屏幕间的切换,ViewFlipper继承了Framelayout类,ViewAnimator类的作用
- SpringMVC路径匹配中使用通配符@RequestMapping中指定的路径也可以使用通配符*表示任意字符。如下的处理器方法可以映射请求
- java LRU(Least Recently Used )详解LRU是Least Recently Used 的缩写,翻译过来就是“最近最