Springboot 整合 RocketMQ 收发消息的配置过程
作者:liqiangbk 发布时间:2023-01-22 22:49:28
标签:Springboot,整合,RocketMQ,收发消息
Springboot 整合 RocketMQ 收发消息
创建springboot项目
pom.xml添加rocketmq-spring-boot-starter
依赖。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
yml 配置
application.yml
rocketmq:
name-server: 192.168.64.141:9876
application-demo1.yml
使用 demo1 profile 指定生产者组组名
rocketmq:
producer:
group: producer-demo1
application-demo2.yml
使用 demo2 profile 指定生产者组组名
rocketmq:
producer:
group: producer-demo2
测试
demo 1
发送普通消息
发送 Spring 的通用 Message 对象
发送异步消息
发送顺序消息
生产者
package cn.tedu.demo2.m1;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class Producer {
@Autowired
private RocketMQTemplate t ;
public void send(){
//发送同步消息
t.convertAndSend("Topic1:TagA", "Hello world! ");
//发送spring的Message
Message<String> message = MessageBuilder.withPayload("Hello Spring message! ").build();
t.send("Topic1:TagA",message);
//发送异步消息
t.asyncSend("Topic1:TagA", "hello world asyn", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送失败");
}
});
//发送顺序消息
t.syncSendOrderly("Topic1", "98456237,创建", "98456237");
t.syncSendOrderly("Topic1", "98456237,支付", "98456237");
t.syncSendOrderly("Topic1", "98456237,完成", "98456237");
}
}
消费者
package cn.tedu.demo2.m1;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "Topic1",consumerGroup = "consumer-demo1")
public class Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("收到"+s);
}
}
主类
package cn.tedu.demo2.m1;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
}
测试类
需要放在 test 文件夹
激活 demo1 profile @ActiveProfiles("demo1")
package cn.tedu.demo2.m1;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
@SpringBootTest
@ActiveProfiles("demo1")
public class Test1 {
@Autowired
private Producer producer;
@Test
public void test1(){
producer.send();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
demo 2
发送事务消息
生产者
package cn.tedu.demo2.m2;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class Producer {
@Autowired
private RocketMQTemplate t;
public void send(){
Message<String> message = MessageBuilder.withPayload("Hello world").build();
//一旦发送消息,则执行 *
t.sendMessageInTransaction("Topic2",message,null);
}
@RocketMQTransactionListener
class Lis implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("执行本地事务");
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
System.out.println("执行事务回查");
return RocketMQLocalTransactionState.COMMIT;
}
}
}
消费者
package cn.tedu.demo2.m2;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "Topic2",consumerGroup = "consumer-demo2")
public class Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("收到"+s);
}
}
主类
package cn.tedu.demo2.m2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
}
测试类
package cn.tedu.demo2.m2;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
@SpringBootTest
@ActiveProfiles("demo2")
public class Test2 {
@Autowired
private Producer producer;
@Test
public void test1(){
producer.send();
//为了能够收到消费者消费的数据,这里通过休眠模拟等待时间
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
来源:https://www.cnblogs.com/liqbk/p/13677137.html
0
投稿
猜你喜欢
- 主窗体Form1关键代码: 将子窗体最为对话框模式弹出,当窗体关闭或取消时更新主窗体 private void simpleButton1_
- 一、封装一个工具类1、简易版package net.aexit.construct.acceptance.websky.utils;impo
- Mybatis无法获取带有下划线前缀的字段的值今天下面,把几张表里的字段都加了前缀,如 article_id,article_title,a
- 图像切换器(ImageSwitcher),用于实现类似于Windows操作系统的“Windows照片查看器”中的上一张、下一张切换图片的功能
- 一、java内存区域Java虚拟机在执行Java程序的过程中会把它所管理的内存划分为若干个不同的数据区域。这些区域都有各自的用途,以及创建和
- 本文实例讲述了C#实现简单合并word文档的方法。分享给大家供大家参考。具体如下:using System;using System.Col
- 1、下载内嵌浏览器Jar包下载地址:点击下载2、项目下加入对应jar;然后右键:Add as Library...3、添加启动项目后事件效果
- 问题描述通过FeignClient调用微服务提供的分页对象IPage报错{"message": "Type d
- 在Java世界中,AES、DES加密解密需要使用Cipher对象构建加密解密系统,Hutool中对这一对象做再包装,简化了加密解密过程。介绍
- 第1 部分 hashCode的作用Java集合中有两类,一类是List,一类是Set他们之间的区别就在于List集合中的元素师有序的,且可以
- 定义:一个对象应该对其他对象了解最少迪米特法则的核心观念就是类间解耦,弱耦合,只有弱耦合了以后,类的复用性才可以提高。形象一点的比喻类似于:
- 本文讲解2点:1. fastjson生成和解析json数据(举例:4种常用类型:JavaBean,List<JavaBean>,
- Spring Boot 异常处理异常处理是一种识别并响应错误的一致性机制,异常机制可以把程序中的异常处理代码和正常的业务逻辑代码分离,包装程
- 在上一篇笔记 《SpringMVC实现图片上传》记录了将图片上传到本地的实现,在很多项目中都会有一台专门的文件服务器来保存文件的,这边记录下
- 简要说明本文采用ImageSwitcher实现左右滑动切换图片。首先调用setFactory方法,设置视图工厂;然后设置手指触碰监听,判断左
- 2011年6月iBatis 更名为 MyBatis,从 iBatis 到 MyBatis,不只是名称上的变化,MyBatis 提供了更为强大
- Map在Java8中新增了两个replace的方法1.replace(k,v)在指定的键已经存在并且有与之相关的映射值时才会将指定的键映射到
- 这篇文章主要介绍了Java 使用Calendar类输出指定年份和月份的日历,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参
- 比如在类上使用该注解 @Alias("dDebtEntity")则在mapper.xml文件中resultType=&q
- 参考链接亲测试以下版本成功激活附激活教程。idea下载链接(对应版本号下载):https://www.jetbrains.com/idea/