软件编程
位置:首页>> 软件编程>> java编程>> Springboot整合redis实现发布订阅功能介绍步骤

Springboot整合redis实现发布订阅功能介绍步骤

作者:逆风飞翔的小叔  发布时间:2021-11-23 16:42:26 

标签:Springboot,redis,发布,订阅

一、redis发布订阅简介

Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收信息。可以参考下面两张图进行理解。

Springboot整合redis实现发布订阅功能介绍步骤

Springboot整合redis实现发布订阅功能介绍步骤

二、几个核心概念解释

1.频道

频道(channel)类似于一个快递柜,快递员往里面放快递,收件人去里面取快递。管道(channel)是由中间件(redis)提供的,一个redisServer中有多个channel。

2、消息发布者

可以理解为消息的生产者,消息发布者通过中间件(redis、mq等)向某个频道(管道)发送消息。

3、消息接收者

也可以理解为消息消费者,消息接收者通过订阅某个频道(管道)来接收发布者发布的消息。

发布者无需关心是否有人接收消息,发布者只需要把消息发布到某个管道中即可;

三、适用场景

1、核心业务完成后,非核心业务需要记录日志,发邮件,发短信之类的操作,一般来说,通过这种方式,核心业务与非核心业务起到了一个解耦的作用;

2、事件订阅,比如订阅UP主,博主相关的消息等;

3、监听事件,比如在分布式微服务场景下,当应用A的某个数据发生变化时,应用B需要同步更新自己的数据做自身业务操作,对于应用A来说并不关心哪个应用,就可以通过这种方式实现;

四、与springboot的整合

1、导入基础依赖

<dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-data-redis</artifactId>
       </dependency>

2、配置文件

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://IP:3306/school?autoReconnect=true&useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false
    username: root
    password: root
    druid:
      max-active: 100
      initial-size: 10
      max-wait: 60000
      min-idle: 5
  redis:
    host: localhost
    port:  6379
  cache:
    type: redis

3、自定义RedisSubConfig

往容器(RedisMessageListenerContainer)内添加消息 * ,注意,container的参数列表是可以传多个 * 的,但是要定义 * 的bean。在定义 * 的方法体内绑定消息处理器和管道(channel),一个 * 可以监听多个管道,可以通过数组或者添加多个channel的方式定义;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
@Configuration
public class RedisSubConfig {
   @Bean
   public RedisMessageListenerContainer container(RedisConnectionFactory factory, RedisMessageListener listener) {
       RedisMessageListenerContainer container = new RedisMessageListenerContainer();
       container.setConnectionFactory(factory);
       //订阅频道redis.news 和 redis.life  这个container 可以添加多个 messageListener
       container.addMessageListener(listener, new ChannelTopic("redis.user"));
       //container.addMessageListener(listener, new ChannelTopic("redis.news"));
       return container;
   }
}

4、自定义消息 *

需要实现MessageListener 接口,重写里面的onMessage方法,方法体内需要创建一个MessageListenerAdapter(这是一种规范写法,用于绑定消息处理器和 * )。

这种写法和很多消息中间件对应的API很相似,即通过一个监听的代码块来完成监听到消息后具体的业务操作;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@Component
public class RedisMessageListener implements MessageListener {
   @Autowired
   private RedisTemplate redisTemplate;
   @Override
   public void onMessage(Message message, byte[] pattern) {
       // 获取消息
       byte[] messageBody = message.getBody();
       // 使用值序列化器转换
       Object msg = redisTemplate.getValueSerializer().deserialize(messageBody);
       // 获取监听的频道
       byte[] channelByte = message.getChannel();
       // 使用字符串序列化器转换
       Object channel = redisTemplate.getStringSerializer().deserialize(channelByte);
       // 渠道名称转换
       String patternStr = new String(pattern);
       System.out.println(patternStr);
       System.out.println("---频道---: " + channel);
       System.out.println("---消息内容---: " + msg);
   }
}

5、redistemplate的序列化

@Bean
   public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
       RedisTemplate<Object, Object> template = new RedisTemplate<>();
       template.setConnectionFactory(connectionFactory);
       //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值
       Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
       ObjectMapper mapper = new ObjectMapper();
       mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
       mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
       jackson2JsonRedisSerializer.setObjectMapper(mapper);
       template.setValueSerializer(jackson2JsonRedisSerializer);
       //使用StringRedisSerializer来序列化和反序列化redis的key值
       template.setKeySerializer(new StringRedisSerializer());
       template.afterPropertiesSet();
       return template;
   }

6、功能测试

下面写一个测试的接口,模拟业务处理完毕后,向 redis.user 这个通道发送一条消息,看看 * 中是否能够正常接收到消息即可;

@RestController
public class RedisPubController {
   @Autowired
   private RedisTemplate redisTemplate;
   @GetMapping("/getUserById")
   public String getUserById(String userId){
       //TODO 执行主业务
       redisTemplate.convertAndSend("redis.user", userId);
       return "hello:" + userId;
   }
}

启动工程后调用接口:

http://localhost:8083/getByUserIdId?userId=1

通过断点可以观察发现,通道中的消息被 * 逻辑监听到了,接下来就可以愉快的处理自己的业务了;

Springboot整合redis实现发布订阅功能介绍步骤

本篇通过案例演示了如何基于springboot整合使用redis的发布订阅功能,更多的细节需要结合实际的开发场景进行完善

来源:https://blog.csdn.net/congge_study/article/details/126686318

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com