软件编程
位置:首页>> 软件编程>> java编程>> SpringBoot+netty-socketio实现服务器端消息推送

SpringBoot+netty-socketio实现服务器端消息推送

作者:ATwill...  发布时间:2023-11-15 06:14:31 

标签:SpringBoot,netty-socketio,服务器端,推送

首先:因为工作需要,需要对接socket.io框架对接,所以目前只能使用netty-socketio。websocket是不支持对接socket.io框架的。

netty-socketio顾名思义他是一个底层基于netty'实现的socket。

在springboot项目中的集成,请看下面的代码

maven依赖


<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.11</version>
</dependency>

 下面就是代码了

首先是配置参数


#socketio配置
socketio:
host: localhost
port: 9099
# 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
maxFramePayloadLength: 1048576
# 设置http交互最大内容长度
maxHttpContentLength: 1048576
# socket连接数大小(如只监听一个端口boss线程组为1即可)
bossCount: 1
workCount: 100
allowCustomRequests: true
# 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
upgradeTimeout: 1000000
# Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
pingTimeout: 6000000
# Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
pingInterval: 25000

上面的注释写的很清楚。下面是config代码


import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.SocketConfig;
import com.corundumstudio.socketio.SocketIOServer;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
* kcm
*/
@Component
public class PushServer implements InitializingBean {

@Autowired
 private EventListenner eventListenner;

@Value("${socketio.port}")
 private int serverPort;

@Value("${socketio.host}")
 private String serverHost;

@Value("${socketio.bossCount}")
 private int bossCount;

@Value("${socketio.workCount}")
 private int workCount;

@Value("${socketio.allowCustomRequests}")
 private boolean allowCustomRequests;

@Value("${socketio.upgradeTimeout}")
 private int upgradeTimeout;

@Value("${socketio.pingTimeout}")
 private int pingTimeout;

@Value("${socketio.pingInterval}")
 private int pingInterval;

@Override
 public void afterPropertiesSet() throws Exception {
   Configuration config = new Configuration();
   config.setPort(serverPort);
   config.setHostname(serverHost);
   config.setBossThreads(bossCount);
   config.setWorkerThreads(workCount);
   config.setAllowCustomRequests(allowCustomRequests);
   config.setUpgradeTimeout(upgradeTimeout);
   config.setPingTimeout(pingTimeout);
   config.setPingInterval(pingInterval);

SocketConfig socketConfig = new SocketConfig();
   socketConfig.setReuseAddress(true);
   socketConfig.setTcpNoDelay(true);
   socketConfig.setSoLinger(0);
   config.setSocketConfig(socketConfig);

SocketIOServer server = new SocketIOServer(config);
   server.addListeners(eventListenner);
   server.start();
   System.out.println("启动正常");
 }
}

在就是监听代码


import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;
import com.corundumstudio.socketio.annotation.OnEvent;
import org.apache.commons.lang3.StringUtils;
import org.bangying.auth.JwtSupport;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.UUID;

@Component
public class EventListenner {
 @Resource
 private ClientCache clientCache;

@Resource
 private JwtSupport jwtSupport;

/**
  * 客户端连接
  *
  * @param client
  */
 @OnConnect
 public void onConnect(SocketIOClient client) {
   String userId = client.getHandshakeData().getSingleUrlParam("userId");
//    userId = jwtSupport.getApplicationUser().getId().toString();
//    userId = "8";
   UUID sessionId = client.getSessionId();
   clientCache.saveClient(userId, sessionId, client);
   System.out.println("建立连接");
 }

/**
  * 客户端断开
  *
  * @param client
  */
 @OnDisconnect
 public void onDisconnect(SocketIOClient client) {
   String userId = client.getHandshakeData().getSingleUrlParam("userId");
   if (StringUtils.isNotBlank(userId)) {
     clientCache.deleteSessionClient(userId, client.getSessionId());
     System.out.println("关闭连接");
   }
 }

//消息接收入口,当接收到消息后,查找发送目标客户端,并且向该客户端发送消息,且给自己发送消息
 // 暂未使用
 @OnEvent("messageevent")
 public void onEvent(SocketIOClient client, AckRequest request) {
 }
}

本地缓存信息


import com.corundumstudio.socketio.SocketIOClient;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
* kcm
*/
@Component
public class ClientCache {

//本地缓存
 private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap=new ConcurrentHashMap<>();
 /**
  * 存入本地缓存
  * @param userId 用户ID
  * @param sessionId 页面sessionID
  * @param socketIOClient 页面对应的通道连接信息
  */
 public void saveClient(String userId, UUID sessionId,SocketIOClient socketIOClient){
   if(StringUtils.isNotBlank(userId)){
     HashMap<UUID, SocketIOClient> sessionIdClientCache=concurrentHashMap.get(userId);
     if(sessionIdClientCache==null){
       sessionIdClientCache = new HashMap<>();
     }
     sessionIdClientCache.put(sessionId,socketIOClient);
     concurrentHashMap.put(userId,sessionIdClientCache);
   }
 }
 /**
  * 根据用户ID获取所有通道信息
  * @param userId
  * @return
  */
 public HashMap<UUID, SocketIOClient> getUserClient(String userId){
   return concurrentHashMap.get(userId);
 }
 /**
  * 根据用户ID及页面sessionID删除页面链接信息
  * @param userId
  * @param sessionId
  */
 public void deleteSessionClient(String userId,UUID sessionId){
   concurrentHashMap.get(userId).remove(sessionId);
 }
}

下面是存储客户端连接信息


import com.corundumstudio.socketio.SocketIOClient;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
* kcm
*/
@Component
public class ClientCache {

//本地缓存
 private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap=new ConcurrentHashMap<>();
 /**
  * 存入本地缓存
  * @param userId 用户ID
  * @param sessionId 页面sessionID
  * @param socketIOClient 页面对应的通道连接信息
  */
 public void saveClient(String userId, UUID sessionId,SocketIOClient socketIOClient){
   if(StringUtils.isNotBlank(userId)){
     HashMap<UUID, SocketIOClient> sessionIdClientCache=concurrentHashMap.get(userId);
     if(sessionIdClientCache==null){
       sessionIdClientCache = new HashMap<>();
     }
     sessionIdClientCache.put(sessionId,socketIOClient);
     concurrentHashMap.put(userId,sessionIdClientCache);
   }
 }
 /**
  * 根据用户ID获取所有通道信息
  * @param userId
  * @return
  */
 public HashMap<UUID, SocketIOClient> getUserClient(String userId){
   return concurrentHashMap.get(userId);
 }
 /**
  * 根据用户ID及页面sessionID删除页面链接信息
  * @param userId
  * @param sessionId
  */
 public void deleteSessionClient(String userId,UUID sessionId){
   concurrentHashMap.get(userId).remove(sessionId);
 }
}

控制层推送方法


@RestController
@RequestMapping("/push")
public class PushController {
 @Resource
 private ClientCache clientCache;

@Autowired
 private JwtSupport jwtSupport;

@GetMapping("/message")
 public String pushTuUser(@Param("id") String id){
   Integer userId = jwtSupport.getApplicationUser().getId();
   HashMap<UUID, SocketIOClient> userClient = clientCache.getUserClient(String.valueOf(userId));
   userClient.forEach((uuid, socketIOClient) -> {
     //向客户端推送消息
     socketIOClient.sendEvent("chatevent","服务端推送消息");
   });
   return "success";
 }
}

来源:https://blog.csdn.net/kang649882/article/details/104840441/

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com