图文并茂讲解RocketMQ消息类别
作者:一个双子座的Java攻城狮 发布时间:2023-06-11 07:59:41
标签:RocketMQ,消息类别,消息类型
1、同步消息
即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)
生产者:
public class Producer {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer=new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.23.127:9876");
producer.start();
for (int i = 1; i <= 5; i++) {
Message msg = new Message("topic2",("同步消息:hello rocketmq "+i).getBytes("UTF-8"));
//同步消息发送
SendResult result = producer.send(msg);
System.out.println("返回结果:"+result);
}
producer.shutdown();
}
}
消费者:
public class Consumer {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.23.127:9876");
consumer.subscribe("topic2","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
//System.out.println("收到消息:"+msg);
System.out.println("消息:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者
}
});
consumer.start();// 开启多线程 监控消息,持续运行
System.out.println("接收消息服务已运行");
}
}
测试:
2、异步消息
即时性较弱,但需要有回执的消息,例如订单中的某些信息
生产者:
public class Producer {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer=new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.23.127:9876");
producer.start();
for (int i = 1; i <= 5; i++) {
//异步消息发送
Message msg = new Message("topic2",("异步消息:hello rocketmq "+i).getBytes("UTF-8"));
producer.send(msg, new SendCallback() {
//表示成功返回结果
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
//表示发送消息失败
@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
});
}
//添加一个休眠操作,确保异步消息返回后能够输出
TimeUnit.SECONDS.sleep(10);
producer.shutdown();
}
}
消费者:
public class Consumer {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.23.127:9876");
consumer.subscribe("topic2","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
//System.out.println("收到消息:"+msg);
System.out.println("消息:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者
}
});
consumer.start();// 开启多线程 监控消息,持续运行
System.out.println("接收消息服务已运行");
}
}
测试:
3、单向消息
不需要有回执的消息,例如日志类消息
生产者:
public class Producer {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer=new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.23.127:9876");
producer.start();
for (int i = 1; i <= 5; i++) {
//单向消息
Message msg = new Message("topic2",("单向消息:hello rocketmq "+i).getBytes("UTF-8"));
producer.sendOneway(msg);
}
//添加一个休眠操作,确保异步消息返回后能够输出
TimeUnit.SECONDS.sleep(10);
producer.shutdown();
}
}
消费者代码同上
测试:
总结 同步消息
SendResult result = producer.send(msg);
异步消息(回调处理结果必须在生产者进程结束前执行,否则回调无法正确执行)
producer.send(msg, new SendCallback() {
//表示成功返回结果
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
//表示发送消息失败
@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
});
单向消息
producer.sendOneway(msg);
来源:https://blog.csdn.net/weixin_64061088/article/details/128406813


猜你喜欢
- 此方案适用于解决springboot项目运行时动态添加数据源,非静态切换多数据源!!!一、多数据源应用场景:1.配置文件配置多数据源,如默认
- 1、idea构建web项目idea构建web项目的超级详细教程,一步一步来,完全没问题!1、新建一个空项目2、新建java模块,名为webD
- 数组的定义数组本质上就是让我们能 " 批量 " 创建相同类型的变量。数组的三种语法格式1、 数据类型 [] 数组名称 =
- class文件中的fields_count和fieldsfields_count描述的是当前的类中定义的字段的个数,注意,这里包括静态字段,
- 使用客户端打开指定的URL使用Process.Start方法可以在浏览器打开指定的URL。代码如下所示。[C#]//使用客户端打开“http
- 题目描述:一个农夫带着一匹狼、一只羊、一颗白菜要过河,只有一条船而且农夫每次最多只能带一个动物或物品过河,并且当农夫不在的时候狼会吃羊,羊会
- 在线扫描相机的调试过程中,需要开辟调试界面来进行位置的配置。调试结束后,一种常用的方式是将调试参数保存并在下次启动时加载。另一种简单方式是直
- 一次性全部绘制出来实现代码import java.awt.*;public class AlgoVisualizer {private st
- 后台控制层: public static final String HEAD_IMG_DIR = "D:/upload/&quo
- <FrameLayout &
- 面对android studio Run 一次项目要等好几分钟的痛点,不得不研究一下android studio 的单元测试。其实我的目的很
- 目录问题描述解决办法本文介绍下PasswordBox进行数据绑定的方法,本文参考链接。本文完整示例程序见GitHub。问题描述Passwor
- 1、不要为抽象类提供公开的构造方法抽象类可以有构造方法,但是抽象类不能实例化。如果编程人员没有制定构造方法,编译器会自动生成一个默认的pro
- 题目描述Java创建线程的几种方式Java使用Thread类代表线程,所有线程对象都必须是Thread类或者其子类的实例。Java可以用以下
- 如何实现使用TextView的DrawableLeft使图片和文字居中显示呢???代码如下: 1.首先自定义一个类,继承TextViewpa
- SpringMVC @RequestBody自动转json Http415错误项目中想用@RequestBody直接接收json串转成对象网
- import java.util.concurrent.Semaphore;public class ThreeThread {
- Mybatis-Plus将字段设置为null项目场景:最近在做一个需求的时候需要把数据库中的某个字段设置为空问题描述:在代码中通过set方法
- 前序(先序)遍历中序遍历后续遍历层序遍历如图二叉树:二叉树结点结构public class TreeNode { int val
- 先如今idea中的spring项目,springBoot的项目的开发一般都是基于maven创建的项目。这大大简化我我们对于各种依赖包的管理,