springboot基于Redis发布订阅集群下WebSocket的解决方案
作者:毅大师 发布时间:2023-07-14 04:37:45
标签:springboot,WebSocket,集群
一、背景
单机节点下,WebSocket连接成功后,可以直接发送消息。而多节点下,连接时通过nginx会代理到不同节点。
假设一开始用户连接了node1的socket服务。触发消息发送的条件的时候也通过nginx进行代理,假如代理转到了node2节点上,那么node2节点的socket服务就发送不了消息,因为一开始用户注册的是node1节点。这就导致了消息发送失败。
为了解决这一方案,消息发送时,就需要一个中间件来记录,这样,三个节点都可以获取消息,然后在根据条件进行消息推送。
二、解决方案(springboot 基于 Redis发布订阅)
1、依赖
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- websocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2、创建业务处理类 Demo.class,该类可以实现MessageListener接口后重写onMessage方法,也可以不实现,自己写方法。
import com.alibaba.fastjson.JSON;
import com.dy.service.impl.OrdersServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import java.util.HashMap;
/**
* @program:
* @description: redis消息订阅-业务处理
* @author: zhang yi
* @create: 2021-01-25 16:46
*/
@Component
public class Demo implements MessageListener {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void onMessage(Message message, byte[] pattern) {
logger.info("消息订阅成功---------");
logger.info("内容:"+message.getBody());
logger.info("交换机:"+message.getChannel());
}
}
3、创建PubSubConfig配置类
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
/**
* @program:
* @description: redis发布订阅配置
* @author: zhang yi
* @create: 2021-01-25 16:49
*/
@Configuration
@EnableCaching
public class PubSubConfig {
Logger logger = LoggerFactory.getLogger(this.getClass());
//如果是多个交换机,则参数为(RedisConnectionFactory connectionFactory,
// MessageListenerAdapter listenerAdapter,
// MessageListenerAdapter listenerAdapter2)
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 可以添加多个 messageListener,配置不同的交换机
container.addMessageListener(listenerAdapter, new PatternTopic("channel:demo"));
//container.addMessageListener(listenerAdapter2, new PatternTopic("channel:demo2"));
return container;
}
/**
* 消息 * 适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
* @param demo 第一步的业务处理类
* @return
*/
@Bean
MessageListenerAdapter listenerAdapter(Demo demo) {
logger.info("----------------消息 * 加载成功----------------");
// onMessage 就是方法名,基于反射调用
return new MessageListenerAdapter(demo, "onMessage");
}
/**
* 多个交换机就多写一个
* @param subCheckOrder
* @return
*/
//@Bean
//MessageListenerAdapter listenerAdapter2(SubCheckOrder subCheckOrder) {
// logger.info("----------------消息 * 加载成功----------------");
// return new MessageListenerAdapter(subCheckOrder, "onMessage");
//}
@Bean
StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
}
4、消息发布
@Autowired
private RedisTemplate<String, Object> redisTemplate;
redisTemplate.convertAndSend("channel:demo", "我是内容");
三、具体用法
socket连接成功。
socket消息推送时,把信息发布到redis中。socket服务订阅redis的消息,订阅成功后进行推送。集群下的socket都能订阅到消息,但是只有之前连接成功的节点能推送成功,其余的无法推送。
来源:https://blog.csdn.net/qq_39648029/article/details/113175132


猜你喜欢
- 阿姆斯特朗数阿姆斯特朗数是一个数字,等于每个数字的幂乘以总位数。 例如,诸如0、1、153、370、371和407、1634、8208、94
- 文章按照 Socket 的 创建、连接、传输数据、释放资源的过程来写。给出方法、参数的详细信息。一,网络基础说到 Socket,需要学习一下
- 本文实例讲述了Android编程判断网络是否可用及调用系统设置项的方法。分享给大家供大家参考,具体如下:private boolean ch
- 需求基于MTK8163 8.1平台定制导航栏部分,在左边增加音量减,右边增加音量加思路需求开始做之前,一定要研读SystemUI Navig
- 一:百度高德官方网站,然后去创建应用网址:http://lbs.amap.com/1.找到控制台创建一个应用2.添加key名称,注意命名规范
- clone()和Cloneable接口clone顾名思义就是克隆,即,复制一个相等的对象,但是不同的引用地址。我们知道拿到一个对象的地址,只
- 在消息通知的时候,我们经常用到两个控件Notification和Toast。特
- Java在1.5开始引入了注解,目前流行的框架都在用注解,可想而知注解的强大之处。以下通过自定义注解来深入了解java注解。一、创建自定义注
- 1.以ApplocationContext上下文单例模式装配bean为例,深入探讨bean的生命周期:(1).生命周期图:(2).具体事例:
- 根据数据库表名生成实体类公司用的jpa,没有用mybatis。所以也没有用mybatis自动生成。但有些数据库表字段太多,就想着一劳永逸了,
- //C# 根据当前时间获取本周、下周、本月、下月、本季度等时间段DateTime dt = DateTime.Now; //当前
- 在工作过程中,需要将一个文件夹生成压缩文件,然后提供给用户下载。所以自己写了一个压缩文件的工具类。该工具类支持单个文件和文件夹压缩。放代码:
- ListView在虚模式下,CheckBox无法点击,可以用此方法实现private void lvwTitle_MouseDown(obj
- 前言最近在看王清培前辈的.NET框架设计时,当中有提到扩展方法 .开头的一句话是:扩展方法是让我们在不改变类原有代码的情况下动态地添加方法的
- 活锁与死锁活锁活锁同样会发生在多个相互协作的线程间,当他们为了彼此间的响应而相互礼让,使得没有一个线程能够继续前进,那么就发生了活锁。同死锁
- 前言跳过废话,直接看正文当年入坑Java是因为它的跨平台优势。那时我认为,”编写一次,处处运行。”这听上去多么牛逼,应该是所有语言发展的终极
- android提供的工具链和开发工具比较完善,因此它的开发环境的搭建比较简单,相信许多朋友都已经搭建好环境,并编写了HelloA
- springmvc 中的 * 可以对请求进行判别, 在请求到达控制器之前, 把非法的请求给拦截掉下面来说一说, 它在springboot中的
- 在实现蓝牙接收时,网上的资料很多,使用起来也很简单,但是我觉得还是有必要把这些知识总结下来。蓝牙开发需要用到一个第三方的库InTheHand
- 1.引言合理利用线程池能够带来三个好处。第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。第二:提高响应速度。当任务