SpringBoot整合WebSocket的客户端和服务端的实现代码
作者:愤青程序猿 发布时间:2023-03-30 04:19:07
本文是项目中使用了websocket进行一些数据的推送,对比项目做了一个demo,ws的相关问题不做细数,仅做一下记录。
此demo针对ws的搭建主要逻辑背景是一个服务端B:通讯层 产生消息推送出去,另外一个项目A充当客户端和服务端,A的客户端:是接收通讯层去无差别接收这些消息,A的服务端:根据地址ip去订阅。用户通过订阅A的ws,同时记录下自己的信息,项目B推送的消息,项目A接收到之后通过当初订阅的逻辑和一些权限过滤条件对项目B产生的消息进行过滤再推送到用户客户端上。
一、项目中服务端的创建
首先引入maven仓库
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
websocket的服务端搭建
同时注意springboot要开启ws服务
启动类加上@EnableScheduling
简要解读demo
/webSocket/{id}:链接的id是业务上的一个id,这边之前做过类似拍卖的,相当于一个服务端或者业务上的一个标识,是客户端指明链接到哪一个拍卖间的标识
@ServerEndpoint:作为服务端的注解。
package com.ghh.myproject.websocket;
import cn.hutool.core.lang.UUID;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ServerEndpoint("/webSocket/{id}")
@Component
public class WebSocket {
private Logger log = LoggerFactory.getLogger(WebSocket.class);
private static int onlineCount = 0;
/** 创建一个map存放 产生的ws链接推送 */
private static Map<String, WebSocket> clients = new ConcurrentHashMap<>();
/** 创建一个map存放 当前接入的客户端 */
private static Map<String, String> idMap = new ConcurrentHashMap<>();
private Session session;
/** 链接进入的一个场景id */
private String id;
/** 每一个链接的一个唯一标识 */
private String userNo;
/**
* @Description: 第三方文接入当前项目websocket后的记录信息
* @DateTime: 2021/7/5 10:02
* @Author: GHH
* @Params: [id, session]
* @Return void
*/
@OnOpen
public void onOpen(@PathParam("id") String id, Session session) throws IOException {
log.info("已连接到id:{}竞拍场,当前竞拍场人数:{}", id, getUserNosById(id).size());
this.id = id;
this.session = session;
// 生成一个随机序列号来存储一个id下的所有用户
this.userNo = UUID.fastUUID().toString();
addOnlineCount();
//根据随机序列号存储一个socket连接
clients.put(userNo, this);
idMap.put(userNo, id);
}
/**
* @Description: 关闭连接
* @DateTime: 2021/7/5 10:02
* @Author: GHH
* @Params: []
* @Return void
*/
@OnClose
public void onClose() throws IOException {
clients.remove(userNo);
idMap.remove(userNo);
subOnlineCount();
}
/**
* @Description: 客户端发送消息调用此方法
* @DateTime: 2021/6/16 15:35
* @Author: GHH
* @Params: [message]
* @Return void
*/
@OnMessage
public void onMessage(String message) throws IOException {
// JSONObject jsonTo = JSONObject.parseObject(message);
// String mes = (String) jsonTo.get("message");
// if (!("All").equals(jsonTo.get("To"))) {
// sendMessageTo(mes, jsonTo.get("To").toString());
// } else {
// sendMessageAll(message);
// }
log.info("onMessage方法成功");
}
@OnError
public void onError(Session session, Throwable error) {
log.error("{}", error);
}
public static void sendMessageTo(String message, String userNo) throws IOException {
// session.getBasicRemote().sendText(message);
//session.getAsyncRemote().sendText(message);
WebSocket webSocket = clients.get(userNo);
if (webSocket != null && webSocket.session.isOpen()) {
webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message));
}
}
/**
* @Description: 推送到指定的id值的记录
* @DateTime: 2021/6/15 17:11
* @Author: GHH
* @Params: [message, id]
* @Return void
*/
public static void sendMessageToById(String message, String id) {
// session.getBasicRemote().sendText(message);
//session.getAsyncRemote().sendText(message);
//根据id获取所有的userNo链接的用户
List<String> userNos = getUserNosById(id);
for (WebSocket item : clients.values()) {
//遍历链接的value值,如果当前传入的id中链接的用户包含value值,则推送。
if (userNos.contains(item.userNo)) {
item.session.getAsyncRemote().sendText(message);
}
}
}
/**
* @Description: 推送所有开启的信息
* @DateTime: 2021/6/15 17:13
* @Author: GHH
* @Params: [message]
* @Return void
*/
public static void sendMessageAll(String message){
for (WebSocket item : clients.values()) {
item.session.getAsyncRemote().sendText(message);
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocket.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocket.onlineCount--;
}
public static synchronized Map<String, WebSocket> getClients() {
return clients;
}
/**
* @Description: 根据相应场景的一些逻辑处理
* @DateTime: 2021/7/5 10:03
* @Author: GHH
* @Params: [id]
* @Return java.util.List<java.lang.String>
*/
public static List<String> getUserNosById(String id) {
ArrayList<String> userNos = new ArrayList<>();
for (Map.Entry<String, String> entry : idMap.entrySet()) {
if (entry.getValue().equals(id)) {
userNos.add(entry.getKey());
}
}
return userNos;
}
}
demo中模拟的是定时器推送,第一个参数是消息内容,第二个是推送到哪一个拍卖间或者其他业务上的内容。方法的具体内容上一段代码有详细解释,有通过id,或者发送给全部ws链接的客户端
WebSocket.sendMessageToById(""+count,2+"");
@Scheduled(cron = "*/5 * * * * ?")
public void job1(){
log.info("测试生成次数:{}",count);
redisTemplate.opsForValue().set("测试"+count, ""+count++);
if (count%2==0){
WebSocket.sendMessageToById(""+count,2+"");
}else {
WebSocket.sendMessageToById(""+count,1+"");
}
log.info("websocket发送"+count);
}
二、java充当客户端链接ws
上述是java作为ws服务端推送当前业务信息的一个demo。我们项目目前做的是一个通讯层的概念,只能够推送数据内容,却无法根据用户权限去推送不同的数据。
ws客户端的搭建,首先链接ws服务端。首先是我们另外一个服务的ws配置信息,我这边demo是模拟链接上面的ws服务
1、ws客户端的配置
package com.ghh.websocketRecive.wsMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.ContainerProvider;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import java.net.URI;
/**
* @author ghh
* @date 2019-08-16 16:02
*/
@Component
@Slf4j
public class WSClient {
public static Session session;
public static void startWS() {
try {
if (WSClient.session != null) {
WSClient.session.close();
}
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
//设置消息大小最大为10M
container.setDefaultMaxBinaryMessageBufferSize(10*1024*1024);
container.setDefaultMaxTextMessageBufferSize(10*1024*1024);
// 客户端,开启服务端websocket。
String uri = "ws://192.168.0.108:8082/webSocket/1";
Session session = container.connectToServer(WSHandler.class, URI.create(uri));
WSClient.session = session;
} catch (Exception ex) {
log.info(ex.getMessage());
}
}
}
2、配置信息需要在项目启动的时候去启用和链接ws服务
package com.ghh.websocketRecive;
import com.ghh.websocketRecive.wsMessage.WSClient;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import javax.annotation.PostConstruct;
@Slf4j
@EnableScheduling
@SpringBootApplication
@MapperScan("com.ghh.websocketRecive.dao")
public class WebsocketReciveApplication {
public static void main(String[] args) {
SpringApplication.run(WebsocketReciveApplication.class, args);
}
@PostConstruct
public void init(){
log.info("初始化应用程序");// 初始化ws,链接服务端
WSClient.startWS();
}
}
3、接收服务端推送的消息进行权限过滤demo
@ClientEndpoint:作为ws的客户端注解,@OnMessage接收服务端推送的消息。
package com.ghh.websocketRecive.wsMessage;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ghh.websocketRecive.entity.Student;
import com.ghh.websocketRecive.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import java.util.Objects;
import java.util.Set;
import static com.ghh.websocketRecive.wsMessage.WSClient.startWS;
@ClientEndpoint
@Slf4j
@Component
public class WSHandler {
@Autowired
RedisTemplate<String,String> redisTemplate;
private static RedisTemplate<String,String> redisTemplateService;
@PostConstruct
public void init() {
redisTemplateService=redisTemplate;
}
@OnOpen
public void onOpen(Session session) {
WSClient.session = session;
}
@OnMessage
public void processMessage(String message) {
log.info("websocketRecive接收推送消息"+message);
int permission = Integer.parseInt(message)%5;
//查询所有订阅的客户端的ip。
Set<String> keys = redisTemplateService.keys("ip:*");
for (String key : keys) {
// 根据登录后存储的客户端ip,获取权限地址
String s = redisTemplateService.opsForValue().get(key);
String[] split = s.split(",");
for (String s1 : split) {
//向含有推送过来的数据权限地址的客户端推送告警数据。
if (s1.equals(permission+"")){
WebSocket.sendMessageToByIp(message,key.split(":")[1]);
}
}
}
}
@OnError
public void processError(Throwable t) {
WSClient.session = null;
try {
Thread.sleep(5000);
startWS();
} catch (InterruptedException e) {
log.error("---websocket processError InterruptedException---", e);
}
log.error("---websocket processError error---", t);
}
@OnClose
public void processClose(Session session, CloseReason closeReason) {
log.error(session.getId() + closeReason.toString());
}
public void send(String sessionId, String message) {
try {
log.info("send Msg:" + message);
if (Objects.nonNull(WSClient.session)) {
WSClient.session.getBasicRemote().sendText(message);
} else {
log.info("---websocket error----");
}
} catch (Exception e) {
log.error("---websocket send error---", e);
}
}
}
4、ws客户端推送消息,推送消息和上面服务端类似。
这边是根据ip
package com.ghh.websocketRecive.wsMessage;
import cn.hutool.core.lang.UUID;
import com.alibaba.fastjson.JSON;
import com.ghh.websocketRecive.service.UserService;
import lombok.Builder;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ServerEndpoint("/webSocket/{ip}")
@Component
public class WebSocket {
private Logger log = LoggerFactory.getLogger(WebSocket.class);
private static int onlineCount = 0;
private static Map<String, WebSocket> clients = new ConcurrentHashMap<>();
private Session session;
/** 当前连接服务端的客户端ip */
private String ip;
@Autowired
RedisTemplate<String,String> redisTemplate;
private static RedisTemplate<String,String> redisTemplateService;
@PostConstruct
public void init() {
redisTemplateService = redisTemplate;
}
@OnOpen
public void onOpen(@PathParam("ip") String ip, Session session) throws IOException {
log.info("ip:{}客户端已连接:,当前客户端数量:{}", ip, onlineCount+1);
this.ip = ip;
this.session = session;
// 接入一个websocket则生成一个随机序列号
addOnlineCount();
//根据随机序列号存储一个socket连接
clients.put(ip, this);
}
@OnClose
public void onClose() throws IOException {
clients.remove(ip);
onlineCount--;
subOnlineCount();
}
/**
* @Description: 客户端发送消息调用此方法
* @DateTime: 2021/6/16 15:35
* @Author: GHH
* @Params: [message]
* @Return void
*/
@OnMessage
public void onMessage(String message) throws IOException {
log.info("客户端发送消onMessage方法成功");
}
@OnError
public void onError(Session session, Throwable error) {
log.error("{}", error);
}
public static void sendMessageTo(String message, String userNo) throws IOException {
WebSocket webSocket = clients.get(userNo);
if (webSocket != null && webSocket.session.isOpen()) {
webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message));
}
}
/**
* @Description: 推送到指定的ip值的记录
* @DateTime: 2021/6/15 17:11
* @Author: GHH
* @Params: [message, id]
* @Return void
*/
public static void sendMessageToByIp(String message, String ip) {
for (WebSocket item : clients.values()) {
//遍历链接的value值,如果当前传入的ip中链接的用户包含value值,则推送。
if (item.ip.equals(ip)) {
item.session.getAsyncRemote().sendText(message);
}
}
}
/**
* @Description: 推送所有开启的信息
* @DateTime: 2021/6/15 17:13
* @Author: GHH
* @Params: [message]
* @Return void
*/
public static void sendMessageAll(String message){
for (WebSocket item : clients.values()) {
item.session.getAsyncRemote().sendText(message);
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocket.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocket.onlineCount--;
}
public static synchronized Map<String, WebSocket> getClients() {
return clients;
}
}
概述:
至此,简易的demo搭建完成,项目gitee网址:https://gitee.com/ghhNB/study.git
来源:https://www.cnblogs.com/guanyuehao0107/p/14971494.html
猜你喜欢
- 目录一、连接查询:1、多对一:2、一对多:3、多对多:二、嵌套查询:1、多对一:2、一对多:首先在mysql中确立表:#表一:地址国家表CR
- 规则1(无继承情况下):对于静态变量、静态初始化块、变量、初始化块、构造器,它们的初始化顺序依次是(静态变量、静态初始化块)>(变量、
- 问题(1)重入锁是什么?(2)ReentrantLock如何实现重入锁?(3)ReentrantLock为什么默认是非公平模式?(4)Ree
- 这篇文章主要介绍了Java实现发送手机短信语音验证功能代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,
- java static块和构造函数的实例详解构造函数不写时,若该类继续了某个类则会默认集成父类的构造函数。 构造函数在实例化类时执行内部,O
- 获得redis所有的key-value运行结果:redis配置文件需要序列化@Bean public RedisT
- C# using 三种使用方式介绍1.using指令。using + 命名空间名字,这样可以在程序中直接用命令空间中的类型,而不必指定类型的
- 1.什么是结构化编程编程中只使用三大结构不能使用goto语句1972年美国科学家,发表论文证明所有的程序流程,只需要三大结构完成。2.为什么
- 线程可以划分优先级,优先级高的线程得到的CPU资源比较多,也就是CPU优先执行优先级高的线程对象中的任务。设置线程优先级有助于帮助线程规划器
- 我们在编写完Spring的代码后,往往需要测试代码的正确性,这个时候就需要用到单元测试了。我们这里使用的版本是junit4.一个程序的入口是
- 一.static关键字的用途在《Java编程思想》P86页有这样一段话:“static方法就是没有this的方法。在st
- 本文实例讲述了Java数组传递及可变参数操作。分享给大家供大家参考,具体如下:方法可以操作传递和返回基本数据类型,但是方法中也可用来传递和返
- 请求SpringBoot接受前台参数的六种方式,首先因为从前台发送的请求没有界面的话只能是从地址栏发送并且只能是Get请求,为了测试其他的请
- 背景产品想对多次快速点击做一下优化,想要的效果就是双击不会打开多次但是从开发角度来说,我可以用kotlin的拓展方法来调整这个,但是之前的历
- 本文实例讲述了java GUI编程之paint绘制操作。分享给大家供大家参考,具体如下:import java.awt.*;public c
- 本文实例讲述了C#编程实现自定义热键的方法。分享给大家供大家参考。具体实现方法如下:using System;using System.Co
- 这篇文章主要介绍了通过实例了解Java 8创建Stream流的5种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学
- 简介TreeMap使用红黑树存储元素,可以保证元素按key值的大小进行遍历。继承体系TreeMap实现了Map、SortedMap、Navi
- 在一般性开发中,笔者经常看到很多同学在对待java并发开发模型中只会使用一些基础的方法。比如Volatile,synchronized。像L
- 1、JavaBean介绍 * JavaBean的定义:JavaBeans是Java中一种特殊的类,可以将多个对象封装到一个对象(bean)