SpringBoot+Netty+WebSocket实现消息发送的示例代码
作者:阿杜同学 发布时间:2023-08-16 00:02:52
标签:SpringBoot,Netty,WebSocket,消息发送
一.导入Netty依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.25.Final</version>
</dependency>
二.搭建websocket服务器
@Component
public class WebSocketServer {
/**
* 主线程池
*/
private EventLoopGroup bossGroup;
/**
* 工作线程池
*/
private EventLoopGroup workerGroup;
/**
* 服务器
*/
private ServerBootstrap server;
/**
* 回调
*/
private ChannelFuture future;
public void start() {
future = server.bind(9001);
System.out.println("netty server - 启动成功");
}
public WebSocketServer() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
server = new ServerBootstrap();
server.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebsocketInitializer());
}
}
三.初始化Websocket
public class WebsocketInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// ------------------
// 用于支持Http协议
// ------------------
// websocket基于http协议,需要有http的编解码器
pipeline.addLast(new HttpServerCodec());
// 对写大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
// 添加对HTTP请求和响应的聚合器:只要使用Netty进行Http编程都需要使用
//设置单次请求的文件的大小
pipeline.addLast(new HttpObjectAggregator(1024 * 64));
//webSocket 服务器处理的协议,用于指定给客户端连接访问的路由 :/ws
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// 添加Netty空闲超时检查的支持
// 1. 读空闲超时(超过一定的时间会发送对应的事件消息)
// 2. 写空闲超时
// 3. 读写空闲超时
pipeline.addLast(new IdleStateHandler(4, 8, 12));
//添加心跳处理
pipeline.addLast(new HearBeatHandler());
// 添加自定义的handler
pipeline.addLast(new ChatHandler());
}
}
四.创建Netty *
@Component
public class NettyListener implements ApplicationListener<ContextRefreshedEvent> {
@Resource
private WebSocketServer websocketServer;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if(event.getApplicationContext().getParent() == null) {
try {
websocketServer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
五.建立消息通道
public class UserChannelMap {
/**
* 用户保存用户id与通道的Map对象
*/
// private static Map<String, Channel> userChannelMap;
/* static {
userChannelMap = new HashMap<String, Channel>();
}*/
/**
* 定义一个channel组,管理所有的channel
* GlobalEventExecutor.INSTANCE 是全局的事件执行器,是一个单例
*/
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 存放用户与Chanel的对应信息,用于给指定用户发送消息
*/
private static ConcurrentHashMap<String,Channel> userChannelMap = new ConcurrentHashMap<>();
private UserChannelMap(){}
/**
* 添加用户id与channel的关联
* @param userNum
* @param channel
*/
public static void put(String userNum, Channel channel) {
userChannelMap.put(userNum, channel);
}
/**
* 根据用户id移除用户id与channel的关联
* @param userNum
*/
public static void remove(String userNum) {
userChannelMap.remove(userNum);
}
/**
* 根据通道id移除用户与channel的关联
* @param channelId 通道的id
*/
public static void removeByChannelId(String channelId) {
if(!StringUtils.isNotBlank(channelId)) {
return;
}
for (String s : userChannelMap.keySet()) {
Channel channel = userChannelMap.get(s);
if(channelId.equals(channel.id().asLongText())) {
System.out.println("客户端连接断开,取消用户" + s + "与通道" + channelId + "的关联");
userChannelMap.remove(s);
UserService userService = SpringUtil.getBean(UserService.class);
userService.logout(s);
break;
}
}
}
/**
* 打印所有的用户与通道的关联数据
*/
public static void print() {
for (String s : userChannelMap.keySet()) {
System.out.println("用户id:" + s + " 通道:" + userChannelMap.get(s).id());
}
}
/**
* 根据好友id获取对应的通道
* @param receiverNum 接收人编号
* @return Netty通道
*/
public static Channel get(String receiverNum) {
return userChannelMap.get(receiverNum);
}
/**
* 获取channel组
* @return
*/
public static ChannelGroup getChannelGroup() {
return channelGroup;
}
/**
* 获取用户channel map
* @return
*/
public static ConcurrentHashMap<String,Channel> getUserChannelMap(){
return userChannelMap;
}
}
六.自定义消息类型
public class Message {
/**
* 消息类型
*/
private Integer type;
/**
* 聊天消息
*/
private String message;
/**
* 扩展消息字段
*/
private Object ext;
public Integer getType() {
return type;
}
public void setType(Integer type) {
this.type = type;
}
public MarketChatRecord getChatRecord() {
return marketChatRecord;
}
public void setChatRecord(MarketChatRecord chatRecord) {
this.marketChatRecord = chatRecord;
}
public Object getExt() {
return ext;
}
public void setExt(Object ext) {
this.ext = ext;
}
@Override
public String toString() {
return "Message{" +
"type=" + type +
", marketChatRecord=" + marketChatRecord +
", ext=" + ext +
'}';
}
}
七.创建处理消息的handler
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);
/**
* 用来保存所有的客户端连接
*/
private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
*当Channel中有新的事件消息会自动调用
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
// 当接收到数据后会自动调用
// 获取客户端发送过来的文本消息
Gson gson = new Gson();
log.info("服务器收到消息:{}",msg.text());
System.out.println("接收到消息数据为:" + msg.text());
Message message = gson.fromJson(msg.text(), Message.class);
//根据业务要求进行消息处理
switch (message.getType()) {
// 处理客户端连接的消息
case 0:
// 建立用户与通道的关联
// 处理客户端发送好友消息
break;
case 1:
// 处理客户端的签收消息
break;
case 2:
// 将消息记录设置为已读
break;
case 3:
// 接收心跳消息
break;
default:
break;
}
}
// 当有新的客户端连接服务器之后,会自动调用这个方法
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("handlerAdded 被调用"+ctx.channel().id().asLongText());
// 添加到channelGroup 通道组
UserChannelMap.getChannelGroup().add(ctx.channel());
// clients.add(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("{异常:}"+cause.getMessage());
// 删除通道
UserChannelMap.getChannelGroup().remove(ctx.channel());
UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
ctx.channel().close();
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log.info("handlerRemoved 被调用"+ctx.channel().id().asLongText());
//删除通道
UserChannelMap.getChannelGroup().remove(ctx.channel());
UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
UserChannelMap.print();
}
}
八.处理心跳
public class HearBeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent)evt;
if(idleStateEvent.state() == IdleState.READER_IDLE) {
System.out.println("读空闲事件触发...");
}
else if(idleStateEvent.state() == IdleState.WRITER_IDLE) {
System.out.println("写空闲事件触发...");
}
else if(idleStateEvent.state() == IdleState.ALL_IDLE) {
System.out.println("---------------");
System.out.println("读写空闲事件触发");
System.out.println("关闭通道资源");
ctx.channel().close();
}
}
}
}
搭建完成后调用测试
1.页面访问http://localhost:9001/ws
2.端口号9001和访问路径ws都是我们在上边配置的,然后传入我们自定义的消息message类型。
3.大概流程:消息发送 :用户1先连接通道,然后发送消息给用户2,用户2若是在线直接可以发送给用户,若没在线可以将消息暂存在redis或者通道里,用户2链接通道的话,两者可以直接通讯。
消息推送 :用户1连接通道,根据通道id查询要推送的人是否在线,或者推送给所有人,这里我只推送给指定的人。
来源:https://blog.csdn.net/qq_35142561/article/details/108664780


猜你喜欢
- 环境配置:jdk1.8mybatis3.4.1springboot2.0起始原因:编写mybatis的Demo程序时,mapper传递多参数
- 斗地主小游戏之洗牌发牌任务描述编写一个斗地主发牌洗牌的程序,要求按照斗地主的规则完成洗牌发牌的过程,牌面由花色色和数字(包括J,Q,K,A字
- 一.抽象类(一)概念 在继承的层次结构中,每个新的子类都使类变得更加明确和具体。如果从一个子类向父类
- 前言学习Java和Android将近一年的时间了,期间的成果应该就是独立完成了一个Android客户端,并且保证了其在主线版本的稳定性。期间
- 先来看一个很简单的核心图片缩放方法:public static Bitmap scale(Bitmap bitmap, float scal
- 这篇文章主要介绍了如何实现java Iterator迭代器功能,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,
- 一.数组的三种声明方式总结public class WhatEver { public static void main(Str
- Android LayoutInflater加载布局详解对于有一定Android开发经验的同学来说,一定使用过LayoutInf
- 本章先讲解Java随机数的几种产生方式,然后通过示例对其进行演示。广义上讲,Java中的随机数的有三种产生方式:(01). 通过System
- 开发过程中经常用到加载圈,特别是车机开发由于外设不同很多操作响应的等待时长经常要用到不同的加载圈。首先,直接上菊花效果图,这是我直接从项目里
- 首先需要一个自定义view用来签字使用,可以修改颜色和画笔的粗细,可以擦拭重新画package com.android.tcm.view;i
- 使用线程池的好处1、降低资源消耗可以重复利用已创建的线程降低线程创建和销毁造成的消耗。2、提高响应速度当任务到达时,任务可以不需要等到线程创
- 公钥加密算法,也就是 非对称加密算法,这种算法加密和解密的密码不一样,一个是公钥,另一个是私钥:公钥和私钥成对出现公开的密钥叫公钥,只有自己
- 下面通过代码看下springboot 跨域配置类,代码如下所示:ackage org.fh.config;import java.io.IO
- 好几年没写博客了,很多知识不记是真的会忘记,以后还是保持写博客的习惯吧。坚持不一定成功,但放弃一定很舒服!(开玩笑(#^.^#))回归正题,
- 我就废话不多说了,大家还是直接看代码吧~Caused by: java.net.SocketException: Software caus
- 前言:在工作中一次排查慢接口时,查到了一个函数耗时较长,最终定位到是通过 List 去重导致的。由于测试环境还有线上早期数据较少,这个接口的
- 提示出现unresolved external symbol _main搜了下找了下原因如下在创建MFC项目时
- 前言继承是面向对象语法的三大特征之一。继承可以降低代码编写的冗余度,提高编程的效率。通过继承,子类获得了父类的成员变量和方法。一个子类如何继
- file.mkdir()创建单级文件夹,file.mkdirs()创建多级文件夹,file.createNewFile()创建的是一个文件。