软件编程
位置:首页>> 软件编程>> java编程>> SpringBoot整合WebSocket的客户端和服务端的实现代码

SpringBoot整合WebSocket的客户端和服务端的实现代码

作者:愤青程序猿  发布时间:2023-03-30 04:19:07 

标签:SpringBoot,WebSocket,客户端,服务端

本文是项目中使用了websocket进行一些数据的推送,对比项目做了一个demo,ws的相关问题不做细数,仅做一下记录。

此demo针对ws的搭建主要逻辑背景是一个服务端B:通讯层 产生消息推送出去,另外一个项目A充当客户端和服务端,A的客户端:是接收通讯层去无差别接收这些消息,A的服务端:根据地址ip去订阅。用户通过订阅A的ws,同时记录下自己的信息,项目B推送的消息,项目A接收到之后通过当初订阅的逻辑和一些权限过滤条件对项目B产生的消息进行过滤再推送到用户客户端上。

一、项目中服务端的创建

首先引入maven仓库

<dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-websocket</artifactId>
       </dependency>

websocket的服务端搭建

同时注意springboot要开启ws服务

启动类加上@EnableScheduling

简要解读demo

/webSocket/{id}:链接的id是业务上的一个id,这边之前做过类似拍卖的,相当于一个服务端或者业务上的一个标识,是客户端指明链接到哪一个拍卖间的标识

@ServerEndpoint:作为服务端的注解。

package com.ghh.myproject.websocket;
import cn.hutool.core.lang.UUID;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ServerEndpoint("/webSocket/{id}")
@Component
public class WebSocket {
   private Logger log = LoggerFactory.getLogger(WebSocket.class);
   private static int onlineCount = 0;
   /** 创建一个map存放   产生的ws链接推送 */
   private static Map<String, WebSocket> clients = new ConcurrentHashMap<>();
   /** 创建一个map存放   当前接入的客户端 */
   private static Map<String, String> idMap = new ConcurrentHashMap<>();

private Session session;
   /** 链接进入的一个场景id */
   private String id;
   /** 每一个链接的一个唯一标识 */
   private String userNo;
   /**
   * @Description: 第三方文接入当前项目websocket后的记录信息
   * @DateTime: 2021/7/5 10:02
   * @Author: GHH
   * @Params: [id, session]
   * @Return void
   */
   @OnOpen
   public void onOpen(@PathParam("id") String id, Session session) throws IOException {
       log.info("已连接到id:{}竞拍场,当前竞拍场人数:{}", id, getUserNosById(id).size());
       this.id = id;
       this.session = session;
       // 生成一个随机序列号来存储一个id下的所有用户
       this.userNo = UUID.fastUUID().toString();
       addOnlineCount();
       //根据随机序列号存储一个socket连接
       clients.put(userNo, this);
       idMap.put(userNo, id);
   }
   /**
   * @Description: 关闭连接
   * @DateTime: 2021/7/5 10:02
   * @Author: GHH
   * @Params: []
   * @Return void
   */
   @OnClose
   public void onClose() throws IOException {
       clients.remove(userNo);
       idMap.remove(userNo);
       subOnlineCount();
   }
   /**
   * @Description: 客户端发送消息调用此方法
   * @DateTime: 2021/6/16 15:35
   * @Author: GHH
   * @Params: [message]
   * @Return void
   */
   @OnMessage
   public void onMessage(String message) throws IOException {
//        JSONObject jsonTo = JSONObject.parseObject(message);
//        String mes = (String) jsonTo.get("message");
//        if (!("All").equals(jsonTo.get("To"))) {
//            sendMessageTo(mes, jsonTo.get("To").toString());
//        } else {
//            sendMessageAll(message);
//        }
       log.info("onMessage方法成功");
   }
   @OnError
   public void onError(Session session, Throwable error) {
       log.error("{}", error);
   }
   public static void sendMessageTo(String message, String userNo) throws IOException {
       // session.getBasicRemote().sendText(message);
       //session.getAsyncRemote().sendText(message);
       WebSocket webSocket = clients.get(userNo);
       if (webSocket != null && webSocket.session.isOpen()) {
           webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message));
       }
   }
   /**
   * @Description: 推送到指定的id值的记录
   * @DateTime: 2021/6/15 17:11
   * @Author: GHH
   * @Params: [message, id]
   * @Return void
   */
   public static void sendMessageToById(String message, String id) {
       // session.getBasicRemote().sendText(message);
       //session.getAsyncRemote().sendText(message);
       //根据id获取所有的userNo链接的用户
       List<String> userNos = getUserNosById(id);
       for (WebSocket item : clients.values()) {
           //遍历链接的value值,如果当前传入的id中链接的用户包含value值,则推送。
           if (userNos.contains(item.userNo)) {
               item.session.getAsyncRemote().sendText(message);
           }
       }
   }
   /**
   * @Description: 推送所有开启的信息
   * @DateTime: 2021/6/15 17:13
   * @Author: GHH
   * @Params: [message]
   * @Return void
   */
   public static void sendMessageAll(String message){
       for (WebSocket item : clients.values()) {
           item.session.getAsyncRemote().sendText(message);
       }
   }
   public static synchronized int getOnlineCount() {
       return onlineCount;
   }
   public static synchronized void addOnlineCount() {
       WebSocket.onlineCount++;
   }
   public static synchronized void subOnlineCount() {
       WebSocket.onlineCount--;
   }
   public static synchronized Map<String, WebSocket> getClients() {
       return clients;
   }
   /**
   * @Description: 根据相应场景的一些逻辑处理
   * @DateTime: 2021/7/5 10:03
   * @Author: GHH
   * @Params: [id]
   * @Return java.util.List<java.lang.String>
   */
   public static List<String> getUserNosById(String id) {
       ArrayList<String> userNos = new ArrayList<>();
       for (Map.Entry<String, String> entry : idMap.entrySet()) {
           if (entry.getValue().equals(id)) {
               userNos.add(entry.getKey());
           }
       }
       return userNos;
   }
}

demo中模拟的是定时器推送,第一个参数是消息内容,第二个是推送到哪一个拍卖间或者其他业务上的内容。方法的具体内容上一段代码有详细解释,有通过id,或者发送给全部ws链接的客户端

WebSocket.sendMessageToById(""+count,2+"");
@Scheduled(cron = "*/5 * * * * ?")
   public void job1(){
       log.info("测试生成次数:{}",count);
       redisTemplate.opsForValue().set("测试"+count, ""+count++);
       if (count%2==0){
           WebSocket.sendMessageToById(""+count,2+"");
       }else {
           WebSocket.sendMessageToById(""+count,1+"");
       }

log.info("websocket发送"+count);
   }

二、java充当客户端链接ws

上述是java作为ws服务端推送当前业务信息的一个demo。我们项目目前做的是一个通讯层的概念,只能够推送数据内容,却无法根据用户权限去推送不同的数据。

ws客户端的搭建,首先链接ws服务端。首先是我们另外一个服务的ws配置信息,我这边demo是模拟链接上面的ws服务

1、ws客户端的配置

package com.ghh.websocketRecive.wsMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.ContainerProvider;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import java.net.URI;
/**
* @author ghh
* @date 2019-08-16 16:02
*/
@Component
@Slf4j
public class WSClient {
   public static Session session;
   public static void startWS() {
       try {
           if (WSClient.session != null) {
               WSClient.session.close();
           }
           WebSocketContainer container = ContainerProvider.getWebSocketContainer();
           //设置消息大小最大为10M
           container.setDefaultMaxBinaryMessageBufferSize(10*1024*1024);
           container.setDefaultMaxTextMessageBufferSize(10*1024*1024);
           // 客户端,开启服务端websocket。
           String uri = "ws://192.168.0.108:8082/webSocket/1";
           Session session = container.connectToServer(WSHandler.class, URI.create(uri));
           WSClient.session = session;
       } catch (Exception ex) {
           log.info(ex.getMessage());
       }
   }
}

2、配置信息需要在项目启动的时候去启用和链接ws服务

package com.ghh.websocketRecive;
import com.ghh.websocketRecive.wsMessage.WSClient;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import javax.annotation.PostConstruct;
@Slf4j
@EnableScheduling
@SpringBootApplication
@MapperScan("com.ghh.websocketRecive.dao")
public class WebsocketReciveApplication {
   public static void main(String[] args) {
       SpringApplication.run(WebsocketReciveApplication.class, args);
   }
   @PostConstruct
   public void init(){
       log.info("初始化应用程序");// 初始化ws,链接服务端
       WSClient.startWS();
   }
}

3、接收服务端推送的消息进行权限过滤demo

@ClientEndpoint:作为ws的客户端注解,@OnMessage接收服务端推送的消息。

package com.ghh.websocketRecive.wsMessage;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ghh.websocketRecive.entity.Student;
import com.ghh.websocketRecive.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import java.util.Objects;
import java.util.Set;
import static com.ghh.websocketRecive.wsMessage.WSClient.startWS;
@ClientEndpoint
@Slf4j
@Component
public class WSHandler {
   @Autowired
   RedisTemplate<String,String> redisTemplate;
   private static RedisTemplate<String,String> redisTemplateService;
   @PostConstruct
   public void init() {
       redisTemplateService=redisTemplate;
   }
   @OnOpen
   public void onOpen(Session session) {
       WSClient.session = session;
   }
   @OnMessage
   public void processMessage(String message) {
       log.info("websocketRecive接收推送消息"+message);
       int permission = Integer.parseInt(message)%5;
       //查询所有订阅的客户端的ip。
       Set<String> keys = redisTemplateService.keys("ip:*");
       for (String key : keys) {
           // 根据登录后存储的客户端ip,获取权限地址
           String s = redisTemplateService.opsForValue().get(key);
           String[] split = s.split(",");
           for (String s1 : split) {
               //向含有推送过来的数据权限地址的客户端推送告警数据。
               if (s1.equals(permission+"")){
                   WebSocket.sendMessageToByIp(message,key.split(":")[1]);
               }
           }
       }
   }
   @OnError
   public void processError(Throwable t) {
       WSClient.session = null;
       try {
           Thread.sleep(5000);
           startWS();
       } catch (InterruptedException e) {
           log.error("---websocket processError InterruptedException---", e);
       }
       log.error("---websocket processError error---", t);
   }
   @OnClose
   public void processClose(Session session, CloseReason closeReason) {
       log.error(session.getId() + closeReason.toString());
   }
   public void send(String sessionId, String message) {
       try {
           log.info("send Msg:" + message);
           if (Objects.nonNull(WSClient.session)) {
               WSClient.session.getBasicRemote().sendText(message);
           } else {
               log.info("---websocket error----");
           }
       } catch (Exception e) {
           log.error("---websocket send error---", e);
       }
   }
}

4、ws客户端推送消息,推送消息和上面服务端类似。

这边是根据ip

package com.ghh.websocketRecive.wsMessage;
import cn.hutool.core.lang.UUID;
import com.alibaba.fastjson.JSON;
import com.ghh.websocketRecive.service.UserService;
import lombok.Builder;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ServerEndpoint("/webSocket/{ip}")
@Component
public class WebSocket {
   private Logger log = LoggerFactory.getLogger(WebSocket.class);
   private static int onlineCount = 0;
   private static Map<String, WebSocket> clients = new ConcurrentHashMap<>();
   private Session session;
   /** 当前连接服务端的客户端ip */
   private String ip;
   @Autowired
   RedisTemplate<String,String> redisTemplate;
   private static RedisTemplate<String,String> redisTemplateService;
   @PostConstruct
   public void init() {
       redisTemplateService = redisTemplate;
   }
   @OnOpen
   public void onOpen(@PathParam("ip") String ip, Session session) throws IOException {
       log.info("ip:{}客户端已连接:,当前客户端数量:{}", ip, onlineCount+1);
       this.ip = ip;
       this.session = session;
       // 接入一个websocket则生成一个随机序列号
       addOnlineCount();
       //根据随机序列号存储一个socket连接
       clients.put(ip, this);
   }
   @OnClose
   public void onClose() throws IOException {
       clients.remove(ip);
       onlineCount--;
       subOnlineCount();
   }
   /**
   * @Description: 客户端发送消息调用此方法
   * @DateTime: 2021/6/16 15:35
   * @Author: GHH
   * @Params: [message]
   * @Return void
   */
   @OnMessage
   public void onMessage(String message) throws IOException {
       log.info("客户端发送消onMessage方法成功");
   }
   @OnError
   public void onError(Session session, Throwable error) {
       log.error("{}", error);
   }
   public static void sendMessageTo(String message, String userNo) throws IOException {
       WebSocket webSocket = clients.get(userNo);
       if (webSocket != null && webSocket.session.isOpen()) {
           webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message));
       }
   }
   /**
   * @Description: 推送到指定的ip值的记录
   * @DateTime: 2021/6/15 17:11
   * @Author: GHH
   * @Params: [message, id]
   * @Return void
   */
   public static void sendMessageToByIp(String message, String ip) {
       for (WebSocket item : clients.values()) {
           //遍历链接的value值,如果当前传入的ip中链接的用户包含value值,则推送。
           if (item.ip.equals(ip)) {
               item.session.getAsyncRemote().sendText(message);
           }
       }
   }
   /**
   * @Description: 推送所有开启的信息
   * @DateTime: 2021/6/15 17:13
   * @Author: GHH
   * @Params: [message]
   * @Return void
   */
   public static void sendMessageAll(String message){
       for (WebSocket item : clients.values()) {
           item.session.getAsyncRemote().sendText(message);
       }
   }
   public static synchronized int getOnlineCount() {
       return onlineCount;
   }
   public static synchronized void addOnlineCount() {
       WebSocket.onlineCount++;
   }
   public static synchronized void subOnlineCount() {
       WebSocket.onlineCount--;
   }
   public static synchronized Map<String, WebSocket> getClients() {
       return clients;
   }
}

概述:

至此,简易的demo搭建完成,项目gitee网址:https://gitee.com/ghhNB/study.git

来源:https://www.cnblogs.com/guanyuehao0107/p/14971494.html

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com