Springboot整合redis实现发布订阅功能介绍步骤
作者:逆风飞翔的小叔 发布时间:2021-11-23 16:42:26
一、redis发布订阅简介
Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收信息。可以参考下面两张图进行理解。
二、几个核心概念解释
1.频道
频道(channel)类似于一个快递柜,快递员往里面放快递,收件人去里面取快递。管道(channel)是由中间件(redis)提供的,一个redisServer中有多个channel。
2、消息发布者
可以理解为消息的生产者,消息发布者通过中间件(redis、mq等)向某个频道(管道)发送消息。
3、消息接收者
也可以理解为消息消费者,消息接收者通过订阅某个频道(管道)来接收发布者发布的消息。
发布者无需关心是否有人接收消息,发布者只需要把消息发布到某个管道中即可;
三、适用场景
1、核心业务完成后,非核心业务需要记录日志,发邮件,发短信之类的操作,一般来说,通过这种方式,核心业务与非核心业务起到了一个解耦的作用;
2、事件订阅,比如订阅UP主,博主相关的消息等;
3、监听事件,比如在分布式微服务场景下,当应用A的某个数据发生变化时,应用B需要同步更新自己的数据做自身业务操作,对于应用A来说并不关心哪个应用,就可以通过这种方式实现;
四、与springboot的整合
1、导入基础依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2、配置文件
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://IP:3306/school?autoReconnect=true&useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false
username: root
password: root
druid:
max-active: 100
initial-size: 10
max-wait: 60000
min-idle: 5
redis:
host: localhost
port: 6379
cache:
type: redis
3、自定义RedisSubConfig
往容器(RedisMessageListenerContainer)内添加消息 * ,注意,container的参数列表是可以传多个 * 的,但是要定义 * 的bean。在定义 * 的方法体内绑定消息处理器和管道(channel),一个 * 可以监听多个管道,可以通过数组或者添加多个channel的方式定义;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
@Configuration
public class RedisSubConfig {
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory factory, RedisMessageListener listener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
//订阅频道redis.news 和 redis.life 这个container 可以添加多个 messageListener
container.addMessageListener(listener, new ChannelTopic("redis.user"));
//container.addMessageListener(listener, new ChannelTopic("redis.news"));
return container;
}
}
4、自定义消息 *
需要实现MessageListener 接口,重写里面的onMessage方法,方法体内需要创建一个MessageListenerAdapter(这是一种规范写法,用于绑定消息处理器和 * )。
这种写法和很多消息中间件对应的API很相似,即通过一个监听的代码块来完成监听到消息后具体的业务操作;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@Component
public class RedisMessageListener implements MessageListener {
@Autowired
private RedisTemplate redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
// 获取消息
byte[] messageBody = message.getBody();
// 使用值序列化器转换
Object msg = redisTemplate.getValueSerializer().deserialize(messageBody);
// 获取监听的频道
byte[] channelByte = message.getChannel();
// 使用字符串序列化器转换
Object channel = redisTemplate.getStringSerializer().deserialize(channelByte);
// 渠道名称转换
String patternStr = new String(pattern);
System.out.println(patternStr);
System.out.println("---频道---: " + channel);
System.out.println("---消息内容---: " + msg);
}
}
5、redistemplate的序列化
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
//使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(mapper);
template.setValueSerializer(jackson2JsonRedisSerializer);
//使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
template.afterPropertiesSet();
return template;
}
6、功能测试
下面写一个测试的接口,模拟业务处理完毕后,向 redis.user 这个通道发送一条消息,看看 * 中是否能够正常接收到消息即可;
@RestController
public class RedisPubController {
@Autowired
private RedisTemplate redisTemplate;
@GetMapping("/getUserById")
public String getUserById(String userId){
//TODO 执行主业务
redisTemplate.convertAndSend("redis.user", userId);
return "hello:" + userId;
}
}
启动工程后调用接口:
http://localhost:8083/getByUserIdId?userId=1
通过断点可以观察发现,通道中的消息被 * 逻辑监听到了,接下来就可以愉快的处理自己的业务了;
本篇通过案例演示了如何基于springboot整合使用redis的发布订阅功能,更多的细节需要结合实际的开发场景进行完善
来源:https://blog.csdn.net/congge_study/article/details/126686318


猜你喜欢
- 命令模式命令模式很好理解,举个例子,司令员下令让士兵去干件事情,从整个事情的角度来考虑,司令员的作用是,发出口令,口令经过传递,传到了士兵耳
- No ‘Access-Control-Allow-Origin‘ header is present
- 本文章牵涉到的技术点比较多:spring Data JPA、Redis、Spring MVC,Spirng Cache,所以在看这篇文章的时
- 使用SlidingTabLayout需要准备2个类,分别是 SlidingTabLayout,与SlidingTabStrip,,放进项目中
- 目前较常用的分页实现办法有两种:1.每次翻页都修改SQL,向SQL传入相关参数去数据库实时查出该页的数据并显示。2.查出数据库某张表的全部数
- 一、获取当前时间, 格式为: yyyy-mm-dd hh-mm-ss
- 1.vs中生成dll对应的生成dll的cpp如下 #include<opencv2/opencv.hpp>#inclu
- 有些时候,你必须把某些文件放到Git工作目录中,但又不能提交它们,比如保存了数据库密码的配置文件啦,等等,每次git status都会显示U
- 废话不多说了,直接给大家贴代码了,具体代码如下所示:<update id="updateAuditStateAndType&
- 直接用javaw.exe想打开aspectj-1.9.4.jar安装aspectJ选Java™ Platform SE binary提示JV
- AntPathMatcher前言(1)SpringMVC的路径匹配规则是依照Ant的来的,实际上不只是SpringMVC,整个Spring框
- 目录前言基础组件工作流程初步使用详细流程获取 MapperProxy 对象缓存执行方法构造参数获取需要执行的 SQL 对象执行 SQL 语句
- 不同的事件源可以产生不同类别的事件。例如,按钮可以发送一个ActionEvent对象,而窗口可以发送WindowEvent对象。AWT时间处
- 各位相加给定一个非负整数 num,反复将各个位上的数字相加,直到结果为一位数。示例:输入: 38输出: 2 解释: 各位相加的过程
- Java 异步实现的几种方式1. jdk1.8之前的Futurejdk并发包里的Future代表了未来的某个结果,当我们向线程池中提交任务的
- 1、概述Spring Retry 是Spring框架中的一个组件,它提供了自动重新调用失败操作的能力。这在错误可能是暂时发生的(如瞬时网络故
- 一、Service简介Service是Android程序中四大基础组件之一,它和Activity一样都是Context的子类,只不过它没有U
- 线程中断机制提供了一种方法,用于将线程从阻塞等待中唤醒,尝试打断目标线程的现有处理流程,使之响应新的命令。Java 留给开发者这一自由,我们
- 目录不含return的执行顺序finally子句含return的执行顺序返回类型是对象类型时值的变化结论不含return的执行顺序执行顺序为
- 本文实例讲述了C#通过流写入一行数据到文件的方法。分享给大家供大家参考。具体如下:using System;using System.IO;