软件编程
位置:首页>> 软件编程>> java编程>> springBoot整合rabbitMQ的方法详解

springBoot整合rabbitMQ的方法详解

作者:喜羊羊love红太狼  发布时间:2022-08-19 02:28:33 

标签:springBoot,整合,rabbitMQ

引入pom


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.wxy</groupId>
<artifactId>test-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>test-rabbitmq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
       <dependency>
           <groupId>junit</groupId>
           <artifactId>junit</artifactId>
           <scope>test</scope>
       </dependency>
   </dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

测试


package com.wxy.rabbit;

import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

@RunWith(SpringRunner.class)
@SpringBootTest
class TestRabbitmqApplicationTests {

@Autowired
RabbitTemplate rabbitTemplate;

@Test
public  void sendmessage() {
String exchange  = "exchange.direct";
String routingkey  = "wxy.news";
//object为消息发送的消息体,可以自动实现消息的序列化
Map<String,Object> msg = new HashMap<>();
msg.put("msg","使用mq发送消息");
msg.put("data", Arrays.asList("helloword",123456,true));
rabbitTemplate.convertAndSend(exchange, routingkey,msg);
}

@Test
public  void receive() {
Object object  = rabbitTemplate.receiveAndConvert("wxy.news");
System.out.println(object);
}

}

默认消息转换类型


###############在RabbitTemplate默认使用的是SimpleMessageConverter#######
 private MessageConverter messageConverter = new SimpleMessageConverter();

###############源码:使用SerializationUtils.deserialize###############
  public Object fromMessage(Message message) throws MessageConversionException {
       Object content = null;
       MessageProperties properties = message.getMessageProperties();
       if (properties != null) {
           String contentType = properties.getContentType();
           if (contentType != null && contentType.startsWith("text")) {
               String encoding = properties.getContentEncoding();
               if (encoding == null) {
                   encoding = this.defaultCharset;
               }

try {
                   content = new String(message.getBody(), encoding);
               } catch (UnsupportedEncodingException var8) {
                   throw new MessageConversionException("failed to convert text-based Message content", var8);
               }
           } else if (contentType != null && contentType.equals("application/x-java-serialized-object")) {
               try {
                   content = SerializationUtils.deserialize(this.createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
               } catch (IllegalArgumentException | IllegalStateException | IOException var7) {
                   throw new MessageConversionException("failed to convert serialized Message content", var7);
               }
           }
       }

springBoot整合rabbitMQ的方法详解

将默认消息类型转化成自定义json格式


第一:上面SimpleMessageConverter是org.springframework.amqp.support.converter包下MessageConverter接口的一个实现类

第二:查看该接口MessageConverter下支持哪些消息转化
ctrl+H查看该接口中的所有实现类

第三步:找到json相关的convert

springBoot整合rabbitMQ的方法详解


RabbitTemplateConfigurer中定义if (this.messageConverter != null)则使用配置的messageConverter

################## if (this.messageConverter != null)则使用配置的messageConverter
public void configure(RabbitTemplate template, ConnectionFactory connectionFactory) {
       PropertyMapper map = PropertyMapper.get();
       template.setConnectionFactory(connectionFactory);
       if (this.messageConverter != null) {
           template.setMessageConverter(this.messageConverter);
       }

template.setMandatory(this.determineMandatoryFlag());
       Template templateProperties = this.rabbitProperties.getTemplate();
       if (templateProperties.getRetry().isEnabled()) {
           template.setRetryTemplate((new RetryTemplateFactory(this.retryTemplateCustomizers)).createRetryTemplate(templateProperties.getRetry(), Target.SENDER));
       }

templateProperties.getClass();
       map.from(templateProperties::getReceiveTimeout).whenNonNull().as(Duration::toMillis).to(template::setReceiveTimeout);
       templateProperties.getClass();
       map.from(templateProperties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout);
       templateProperties.getClass();
       map.from(templateProperties::getExchange).to(template::setExchange);
       templateProperties.getClass();
       map.from(templateProperties::getRoutingKey).to(template::setRoutingKey);
       templateProperties.getClass();
       map.from(templateProperties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue);
   }

配置一个messageConversert(org.springframework.amqp.support.converter包中的)


package com.wxy.rabbit.config;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MessageConverConfig {

@Bean
   public MessageConverter getMessageConvert(){
       return  new Jackson2JsonMessageConverter();
   }
}

再次发送消息体json格式

springBoot整合rabbitMQ的方法详解

使用注解@RabbitListener监听

监听多个队列


@RabbitListener(queues = {"wxy.news","wxy.emps"})

监听单个队列


@RabbitListener(queues = "wxy.news")

package com.wxy.rabbit.service;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class RabbitMqReceiveService {

@RabbitListener(queues = {"wxy.news","wxy.emps"})
   public void  getReceiveMessage(){
       System.out.println("监听到性的消息");
   }

@RabbitListener(queues =  {"wxy.news","wxy.emps"})
   public void  getReceiveMessageHead(Message message){
       System.out.println(message.getBody());
       System.out.println( message.getMessageProperties());
   }

}

在程序中创建队列,交换器,并进行绑定


@Test
public  void create() {
//创建一个点对点的交换器
amqpAdmin.declareExchange(new DirectExchange("amqpexchange.direct"));
//创建一个队列
// String name,:队列名称
// boolean durable :持久化
amqpAdmin.declareQueue(new Queue("amqp.queue",true));
//绑定
//String destination, Binding.DestinationType destinationType, String exchange, String routingKey
//  @Nullable Map<String, Object> arguments
amqpAdmin.declareBinding(new Binding("amqp.queue", Binding.DestinationType.QUEUE,
"amqpexchange.direct","wxy.news", null));

}

来源:https://blog.csdn.net/qq_38423256/article/details/115770100

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com