Springboot 整合 RocketMQ 收发消息的配置过程
作者:liqiangbk 发布时间:2023-01-22 22:49:28
标签:Springboot,整合,RocketMQ,收发消息
Springboot 整合 RocketMQ 收发消息
创建springboot项目
pom.xml添加rocketmq-spring-boot-starter
依赖。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
yml 配置
application.yml
rocketmq:
name-server: 192.168.64.141:9876
application-demo1.yml
使用 demo1 profile 指定生产者组组名
rocketmq:
producer:
group: producer-demo1
application-demo2.yml
使用 demo2 profile 指定生产者组组名
rocketmq:
producer:
group: producer-demo2
测试
demo 1
发送普通消息
发送 Spring 的通用 Message 对象
发送异步消息
发送顺序消息
生产者
package cn.tedu.demo2.m1;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class Producer {
@Autowired
private RocketMQTemplate t ;
public void send(){
//发送同步消息
t.convertAndSend("Topic1:TagA", "Hello world! ");
//发送spring的Message
Message<String> message = MessageBuilder.withPayload("Hello Spring message! ").build();
t.send("Topic1:TagA",message);
//发送异步消息
t.asyncSend("Topic1:TagA", "hello world asyn", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送失败");
}
});
//发送顺序消息
t.syncSendOrderly("Topic1", "98456237,创建", "98456237");
t.syncSendOrderly("Topic1", "98456237,支付", "98456237");
t.syncSendOrderly("Topic1", "98456237,完成", "98456237");
}
}
消费者
package cn.tedu.demo2.m1;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "Topic1",consumerGroup = "consumer-demo1")
public class Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("收到"+s);
}
}
主类
package cn.tedu.demo2.m1;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
}
测试类
需要放在 test 文件夹
激活 demo1 profile @ActiveProfiles("demo1")
package cn.tedu.demo2.m1;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
@SpringBootTest
@ActiveProfiles("demo1")
public class Test1 {
@Autowired
private Producer producer;
@Test
public void test1(){
producer.send();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
demo 2
发送事务消息
生产者
package cn.tedu.demo2.m2;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class Producer {
@Autowired
private RocketMQTemplate t;
public void send(){
Message<String> message = MessageBuilder.withPayload("Hello world").build();
//一旦发送消息,则执行 *
t.sendMessageInTransaction("Topic2",message,null);
}
@RocketMQTransactionListener
class Lis implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("执行本地事务");
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
System.out.println("执行事务回查");
return RocketMQLocalTransactionState.COMMIT;
}
}
}
消费者
package cn.tedu.demo2.m2;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "Topic2",consumerGroup = "consumer-demo2")
public class Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("收到"+s);
}
}
主类
package cn.tedu.demo2.m2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
}
测试类
package cn.tedu.demo2.m2;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
@SpringBootTest
@ActiveProfiles("demo2")
public class Test2 {
@Autowired
private Producer producer;
@Test
public void test1(){
producer.send();
//为了能够收到消费者消费的数据,这里通过休眠模拟等待时间
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
来源:https://www.cnblogs.com/liqbk/p/13677137.html


猜你喜欢
- 1.Java运行环境搭建,对于初学者来说,主要下载安装jdk即可,windows操作系统再配合记事本,即可进行java程序开发。后续的学习以
- 我在5月份的时候就申请了洞态IAST企业版内测,算是比较早的一批用户了。聊聊几个我比较在意的问题,比如API接口覆盖率、第三方开源组件检测以
- 多网卡环境下Eureka服务注册IP选择问题场景服务器上分别配置了eth0和eth1两块网卡,只有eth1的地址可供其它机器访问,在这种情况
- Collection是最基本的集合接口,一个Collection代表一组Object,即Collection的元素(Elements)。一些
- 网上文章虽多,但是这种效果少之又少,我真诚的献上以供大家参考实现原理:自定义ImageView对此控件进行相应的layout(动态布局).这
- 本文实例为大家分享了java实现双色球抽奖的具体代码,供大家参考,具体内容如下实现双色球先考虑整体思路:1.随机生成7位数的数组为大奖号码(
- 在面向对象的概念中,所有的对象都是通过类来描绘的,但是反过来,并不是所有的类都是用来描绘对象的,如果一个类中没有包含足够的信息来描绘一个具体
- Spring多数据源实现的方式大概有2中,一种是新建多个MapperScan扫描不同包,另外一种则是通过继承AbstractRoutingD
- 收费版本:Rainbow Brackets免费版本:Rainbow Brackets Lite介绍一款可以将 (圆括号) [方括号] {花括
- Android API Demos中有很多非常Nice的例子,这些例子的代码都写的很出色,如果大家把API Demos中的每个例子研究透了,
- 这篇文章主要介绍了简单了解Java中的可重入锁,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参
- 关于startService的基本使用概述及其生命周期可参见《Android中startService基本使用方法概述》。本文通过批量下载文
- 问题在使用 Abp 框架的后台作业时,当后台作业抛出异常,会导致整个程序崩溃。在 Abp 框架的底层执行后台作业的时候,有 try/catc
- 为了避免直接进入项目中存在的页面,使用filter过滤器新建一个类loginFilter:package com.tjcu.filter;i
- Android 自动获取验证码的两种方式分别是BroadcastReceiver及ContentObserver,两种方式都需要进行注册、取
- 如何解决某个节点故障的问题?如何解决数据一致性的问题?如何解决数据倾斜的问题?CAP理论先从定义开始:C(Consistence):一致性所
- java 对象的克隆一、对象的浅克隆(1)需要克隆类需要重写Object类的clone方法,并且实现Cloneable接口(标识接口,无需实
- 如下所示://判断整数(int)private boolean isInteger(String str) {if (null == str
- SpringBoot停止启动时测试检查rabbitmq问题在Springboot项目中配置rabbitmq后,总是在每次启动时自动测试MQ的
- 本文实例讲述了Android判断Activity是否在最上层的方法。分享给大家供大家参考,具体如下:private boolean isTo