Springboot Websocket Stomp 消息订阅推送
作者:代码大师麦克劳瑞 发布时间:2022-05-08 14:39:30
需求背景
闲话不扯,直奔主题。需要和web前端建立长链接,互相实时通讯,因此想到了websocket,后面随着需求的变更,需要用户订阅主题,实现消息的精准推送,发布订阅等,则想到了STOMP(Simple Text-Orientated Messaging Protocol) 面向消息的简单文本协议。
websocket协议
想到了之前写的一个websocket长链接的demo,也贴上代码供大家参考。
pom文件
直接引入spring-boot-starter-websocket即可。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
声明websocket endpoint
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @ClassName WebSocketConfig
* @Author scott
* @Date 2021/6/16
* @Version V1.0
**/
@Configuration
public class WebSocketConfig {
/**
* 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
websocket实现类,其中通过注解监听了各种事件,实现了推送消息等相关逻辑
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.ruoyi.common.core.domain.AjaxResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @ClassName: DataTypePushWebSocket
* @Author: scott
* @Date: 2021/6/16
**/
@ServerEndpoint(value = "/ws/dataType/push/{token}")
@Component
public class DataTypePushWebSocket {
private static final Logger log = LoggerFactory.getLogger(DataTypePushWebSocket.class);
/**
* 记录当前在线连接数
*/
private static AtomicInteger onlineCount = new AtomicInteger(0);
private static Cache<String, Session> SESSION_CACHE = CacheBuilder.newBuilder()
.initialCapacity(10)
.maximumSize(300)
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("token")String token) {
String sessionId = session.getId();
onlineCount.incrementAndGet(); // 在线数加1
this.sendMessage("sessionId:" + sessionId +",已经和server建立连接", session);
SESSION_CACHE.put(sessionId,session);
log.info("有新连接加入:{},当前在线连接数为:{}", session.getId(), onlineCount.get());
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(Session session,@PathParam("token")String token) {
onlineCount.decrementAndGet(); // 在线数减1
SESSION_CACHE.invalidate(session.getId());
log.info("有一连接关闭:{},当前在线连接数为:{}", session.getId(), onlineCount.get());
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session,@PathParam("token")String token) {
log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
this.sendMessage("服务端已收到推送消息:" + message, session);
}
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误");
error.printStackTrace();
}
/**
* 服务端发送消息给客户端
*/
private static void sendMessage(String message, Session toSession) {
try {
log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
toSession.getBasicRemote().sendText(message);
} catch (Exception e) {
log.error("服务端发送消息给客户端失败:{}", e);
}
}
public static AjaxResult sendMessage(String message, String sessionId){
Session session = SESSION_CACHE.getIfPresent(sessionId);
if(Objects.isNull(session)){
return AjaxResult.error("token已失效");
}
sendMessage(message,session);
return AjaxResult.success();
}
public static AjaxResult sendBroadcast(String message){
long size = SESSION_CACHE.size();
if(size <=0){
return AjaxResult.error("当前没有在线客户端,无法推送消息");
}
ConcurrentMap<String, Session> sessionConcurrentMap = SESSION_CACHE.asMap();
Set<String> keys = sessionConcurrentMap.keySet();
for (String key : keys) {
Session session = SESSION_CACHE.getIfPresent(key);
DataTypePushWebSocket.sendMessage(message,session);
}
return AjaxResult.success();
}
}
至此websocket服务端代码已经完成。
stomp协议
前端代码.这个是在某个vue工程中写的js,各位大佬自己动手改改即可。其中Settings.wsPath是后端定义的ws地址例如ws://localhost:9003/ws
import Stomp from 'stompjs'
import Settings from '@/settings.js'
export default {
// 是否启用日志 默认启用
debug:true,
// 客户端连接信息
stompClient:{},
// 初始化
init(callBack){
this.stompClient = Stomp.client(Settings.wsPath)
this.stompClient.hasDebug = this.debug
this.stompClient.connect({},suce =>{
this.console("连接成功,信息如下 ↓")
this.console(this.stompClient)
if(callBack){
callBack()
}
},err => {
if(err) {
this.console("连接失败,信息如下 ↓")
this.console(err)
}
})
},
// 订阅
sub(address,callBack){
if(!this.stompClient.connected){
this.console("没有连接,无法订阅")
return
}
// 生成 id
let timestamp= new Date().getTime() + address
this.console("订阅成功 -> "+address)
this.stompClient.subscribe(address,message => {
this.console(address+" 订阅消息通知,信息如下 ↓")
this.console(message)
let data = message.body
callBack(data)
},{
id: timestamp
})
},
unSub(address){
if(!this.stompClient.connected){
this.console("没有连接,无法取消订阅 -> "+address)
return
}
let id = ""
for(let item in this.stompClient.subscriptions){
if(item.endsWith(address)){
id = item
break
}
}
this.stompClient.unsubscribe(id)
this.console("取消订阅成功 -> id:"+ id + " address:"+address)
},
// 断开连接
disconnect(callBack){
if(!this.stompClient.connected){
this.console("没有连接,无法断开连接")
return
}
this.stompClient.disconnect(() =>{
console.log("断开成功")
if(callBack){
callBack()
}
})
},
// 单位 秒
reconnect(time){
setInterval(() =>{
if(!this.stompClient.connected){
this.console("重新连接中...")
this.init()
}
},time * 1000)
},
console(msg){
if(this.debug){
console.log(msg)
}
},
// 向订阅发送消息
send(address,msg) {
this.stompClient.send(address,{},msg)
}
}
后端stomp config,里面都有注释,写的很详细,并且我加入了和前端的心跳ping pong。
package com.cn.scott.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
/**
* @ClassName: WebSocketStompConfig
* @Author: scott
* @Date: 2021/7/8
**/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
private static long HEART_BEAT=10000;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
//允许使用socketJs方式访问,访问点为webSocket,允许跨域
//在网页上我们就可以通过这个链接
//ws://127.0.0.1:port/ws来和服务器的WebSocket连接
registry.addEndpoint("/ws").setAllowedOrigins("*");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
ThreadPoolTaskScheduler te = new ThreadPoolTaskScheduler();
te.setPoolSize(1);
te.setThreadNamePrefix("wss-heartbeat-thread-");
te.initialize();
//基于内存的STOMP消息代理来代替mq的消息代理
//订阅Broker名称,/user代表点对点即发指定用户,/topic代表发布广播即群发
//setHeartbeatValue 设置心跳及心跳时间
registry.enableSimpleBroker("/user", "/topic").setHeartbeatValue(new long[]{HEART_BEAT,HEART_BEAT}).setTaskScheduler(te);
//点对点使用的订阅前缀,不设置的话,默认也是/user/
registry.setUserDestinationPrefix("/user/");
}
}
后端stomp协议接受、订阅等动作通知
package com.cn.scott.ws;
import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @ClassName StompSocketHandler
* @Author scott
* @Date 2021/6/30
* @Version V1.0
**/
@RestController
public class StompSocketHandler {
@Autowired
private SimpMessagingTemplate simpMessagingTemplate;
/**
* @MethodName: subscribeMapping
* @Description: 订阅成功通知
* @Param: [id]
* @Return: void
* @Author: scott
* @Date: 2021/6/30
**/
@SubscribeMapping("/user/{id}/listener")
public void subscribeMapping(@DestinationVariable("id") final long id) {
System.out.println(">>>>>>用户:"+id +",已订阅");
SubscribeMsg param = new SubscribeMsg(id,String.format("用户【%s】已订阅成功", id));
sendToUser(param);
}
/**
* @MethodName: test
* @Description: 接收订阅topic消息
* @Param: [id, msg]
* @Return: void
* @Author: scott
* @Date: 2021/6/30
**/
@MessageMapping(value = "/user/{id}/listener")
public void UserSubListener(@DestinationVariable long id, String msg) {
System.out.println("收到客户端:" +id+",的消息");
SubscribeMsg param = new SubscribeMsg(id,String.format("已收到用户【%s】发送消息【%s】", id,msg));
sendToUser(param);
}
@GetMapping("/refresh/{userId}")
public void refresh(@PathVariable Long userId, String msg) {
StompSocketHandler.SubscribeMsg param = new StompSocketHandler.SubscribeMsg(userId,String.format("服务端向用户【%s】发送消息【%s】", userId,msg));
sendToUser(param);
}
/**
* @MethodName: sendToUser
* @Description: 推送消息给订阅用户
* @Param: [userId]
* @Return: void
* @Author: scott
* @Date: 2021/6/30
**/
public void sendToUser(SubscribeMsg screenChangeMsg){
//这里可以控制权限等。。。
simpMessagingTemplate.convertAndSendToUser(screenChangeMsg.getUserId().toString(),"/listener", JSON.toJSONString(screenChangeMsg));
}
/**
* @MethodName: sendBroadCast
* @Description: 发送广播,需要用户事先订阅广播
* @Param: [topic, msg]
* @Return: void
* @Author: scott
* @Date: 2021/6/30
**/
public void sendBroadCast(String topic,String msg){
simpMessagingTemplate.convertAndSend(topic,msg);
}
/**
* @ClassName: SubMsg
* @Author: scott
* @Date: 2021/6/30
**/
public static class SubscribeMsg {
private Long userId;
private String msg;
public SubscribeMsg(Long UserId, String msg){
this.userId = UserId;
this.msg = msg;
}
public Long getUserId() {
return userId;
}
public String getMsg() {
return msg;
}
}
}
连接展示
建立连接成功,这里可以看出是基于websocket协议
连接信息
ping pong
调用接口向订阅用户1发送消息,http://localhost:9003/refresh/1?msg=HelloStomp,可以在客户端控制台查看已经收到了消息。这个时候不同用户通过自己的userId可以区分订阅的主题,可以做到通过userId精准的往客户端推送消息。
还记得我们在后端配置的时候还指定了广播的订阅主题/topic,这时我们前端通过js只要订阅了这个主题,那么后端在像这个主题推送消息时,所有订阅的客户端都能收到,感兴趣的小伙伴可以自己试试,api我都写好了。
至此,实战完毕,喜欢的小伙伴麻烦关注加点赞。
springboot + stomp后端源码地址:https://gitee.com/ErGouGeSiBaKe/stomp-server
来源:https://blog.csdn.net/u010786653/article/details/118578178
猜你喜欢
- handler在安卓开发中是必须掌握的技术,但是很多人都是停留在使用阶段。使用起来很简单,就两个步骤,在主线程重写handler的handl
- 先要把word或ppt转换为pdf; 以pdf的格式展示,防止文件拷贝。转换方法1、安装Word、Excel、PowerPoint组件注意:
- springboot部署项目在linux的两种方式 可以选择 war包方式或者jar包方式(个人推荐使用jar方式)1.springboot
- 前言背景平时开发中遇到根据当前用户的角色,只能查看数据权限范围的数据需求。列表实现方案有两种,一是在开发初期就做好判断赛选,但如果这个需求是
- 1 前言敏感词过滤就是你在项目中输入某些字(比如输入xxoo相关的文字时)时要能检测出来,很多项目中都会有一个敏感词管理模块,在敏感词管理模
- 设置OpenFeign的FeignClient的Header信息在微服务间使用Feign进行远程调用时需要在 header 中添加信息,那么
- 前言翻看了下以前大学学习的一些小项目,突然发现有个项目比较有意思,觉得有必要把它分享出来。当然现在看来,里面有很多的不足之处,但因博主现在已
- 这篇文章主要介绍了java 对象参数去空格方式代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋
- 这篇文章需要对git具有一定的了解,并且知道如何安装git工具(其实就是上git官网下载个文件,安装)git这种版本控制工具有什么好处第一个
- 本文实例讲述了Java Socket实现传输压缩对象的方法。分享给大家供大家参考,具体如下:前面文章《Java Socket实现的传输对象功
- 随着现在分布式架构越来越盛行,在很多场景下需要使用到分布式锁。很多小伙伴对于分布式锁还不是特别了解,所以特地总结了一篇文章,让大家一文读懂分
- 前些日子,组里为了在目前的Android程序里实现基于ListView子项的动画效果,希望将最新的RecyclerView引入到程序中,于是
- 有个小伙伴遇到了这样一个问题,就是AutoCompleteTextView实现自动填充的功能。同时要具备手机格式化的功能。下拉列表最后一行是
- 在开发过程中,我们需要统一返回前端json格式的数据,但有些接口的返回值存在 null或者""这种没有意义的字段。不仅影
- 需要用到的知识:注解、AOP、ExpiringMap(带有有效期的映射)我们可以自定义注解,把注解添加到我们的接口上。定义一个切面,执行方法
- Java基本概念JDK包含了不少Java开发相关命令。如,javac、java、javap、javaw、javadoc。虽然现在的Java开
- 类加载器类加载器作用负责将.class文件(存储的物理文件)加载到内存中类加载器过程加载:通过全类名获取这个类准备用流传输,加载进内存,加载
- SurfaceView和TextureView均继承于android.view.View与其它View不同的是,两者都能在独立的线程中绘制和
- 开发环境为android4.1.Handler使用例1这个例子是最简单的介绍handler使用的,是将handler绑定到它所建立的线程中.
- 这是我们用得比较多的一种设计模式,也是23种标准设计模式之一,使用前面讲的简单工厂设计模式,遇到具体产品经常变换时就不太适合了,违反了开闭设