java基于netty NIO的简单聊天室的实现
作者:Alexwym 发布时间:2023-11-18 15:29:43
一、为何要使用netty开发
由于之前已经用Java中的socket写过一版简单的聊天室,这里就不再对聊天室的具体架构进行细致的介绍了,主要关注于使用netty框架重构后带来的改变。对聊天室不了解的同学可以先看下我的博客(《JAVA简单聊天室的实现》)
本篇博客所使用的netty版本为4.1.36,完整工程已上传到Github(https://github.com/Alexlingl/Chatroom),其中lib文件夹下有相应的netty jar包和source包,自行导入即可。
1、为何要重构
之前的聊天室是基于Java原生socket实现的,socket的处理机制属于BIO模型,也就是阻塞IO模型。对于每一个客户端的连接我们都需要启动一个线程来处理,并且该线程会一直阻塞在读取用户数据上面。如此一来,一旦有大量的客户端并发连接我们的服务器,服务器将难以承受。之前用JMeter测试过,单纯使用Java原生socket开发的服务器所能支持的最大并发在2300左右。虽然采用线程池的策略可以在一定程度上提升最大并发数,但也无法超过1W。因此我们需要对其进行重构,使其能够具有更高的性能。
2、为何使用netty框架
使用netty框架主要还是为了提升代码的开发速度,并且减少代码维护成本。使用netty框架开发的程序在复杂度上比使用Java原生NIO类库开发的要小很多。具体可以看下我之前关于解决C10k问题的系列文章,里面有具体的代码。
3、为何netty框架只实现了NIO而没有AIO
前面在解决C10问题时,探究过NIO和AIO的区别,并且使用Java所提供的类库实现了两个小程序,理论上来说AIO性能明显要比NIO高,那为什么netty使用了NIO而不是AIO呢?
官方说法如下:
We obviously did not consider Windows as a serious platform so far, and that's why we were neglecting NIO.2 AIO API which was implemented using IOCP on Windows. (On Linux, it wasn't any faster because it was using the same OS facility - epoll.)
大意就是,windows上面有IOCP来支持AIO的实现,因此AIO的性能会比NIO好。而Linux上面不管是NIO还是AIO,底层都是用epoll实现的,性能差距不大。然而当下绝大部分的服务器还是建立在Linux上,因此没必要使用AIO(使用AIO反而会增加代码的复杂度,增大维护成本)。
二、基于netty NIO的处理模型
1、服务器的类关系
(1)、SubreqServer:创建两个NIO线程组,一个用来监听处理客户端的连接请求,一个用来监听客户端的消息。同时实例化一个ServerBootStrap启动类的对象来启动两个NIO线程组,并且配置必要的参数。
(2)、ChannelInitializer:初始化SocketChannel管道的各项参数,主要有指定解码器和编码器,并指明管道的处理类
(3)、SubreqServerHandler:SocketChannel管道的处理类,负责处理来自客户端的消息
2、客户端的类关系
(1)、SubreqClient:创建一个NIO线程组,用来监听客户端的消息。同时实例化一个ServerBootStrap启动类的对象来启动这个NIO线程组,并且配置必要的参数。最后让主程序阻塞在监听客户端的键盘输入。
(2)、ChannelInitializer:初始化SocketChannel管道的各项参数,主要有指定解码器和编码器,并指明管道的处理类
(3)、SubreqClientHandler:SocketChannel管道的处理类,负责处理来自服务器的消息
三、所涉及类库的源码解读
1、ChannelHandlerAdapter(用来对channel的注册和注销做出反应的类):
(1)、功能
用来实现当用户上线或下线时,通知其他在线的用户。
(2)、类定义
它是ChannelHandler的框架实现
(3)、HandlerAdded()和HandlerRemoved()方法
当有一个channel被注册后将会调用HandlerAdded(),而当有一个channel被注销后将调用HandlerRemoved()方法。并且根据注释我们知道,这两个方法默认不做任何处理,它希望由继承的子类自己去写相应的处理实现。
2、SimpleChannelInboundHandler(用来对接收的消息做出反应)
(1)、功能
用来实现当接收到消息时做出相应反应,如果是服务端,那么将当前消息转发给其他在线的客户端;如果是客户端,就将消息简单地打印出来。
(2)、类定义
这个类是一个泛型类,它只能用于处理一种具体类型的消息。注释中给出一种使用方法,StringHandler继承了SimpleChannelInboundHandler,并且指定泛型变量为String,因此这个继承类只能用于String类型消息的处理。
(2)、channelRead(ChannelHandlerContext ctx, Object msg)
这个方法主要用来处理类型为Object的消息,也就是所有消息。
首先当接收到msg时,先使用accpetInboundMessage()方法来判断该消息是否可以处理。我们来看下该方法的实现。
如果接收到的这个消息应该被处理就返回true,如果它应当被传到ChannelPipeLine的下一个ChannelInboundHandler就返回false。
我们继续来看下ChannelRead()方法。如果接受到的消息可以处理时。它将对消息进行强制转化,将其转为I,并且调用channelRead0()。它是一个抽象类,因此我们在继承SimpleChannelInboundHandler类时,需要根据自己的实际去实现这个方法。
四、关键的代码实现
1、server端
(1)、SubreqServer类
package nettyserverv1;
import java.util.ArrayList;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
/**
* Created by linguolong on 2019/05/08.
* Chatroom server built using netty framework
*/
public class SubreqServer {
//保存已注册的用户信息
public static ArrayList<UserInfo> userlist = new ArrayList<UserInfo>();
//自动生成注册用户
static{
for(int i=0;i<10;i++){
UserInfo user=new UserInfo();
user.setUserID("123"+i);
user.setUserName("user"+i);
user.setPassword("pwd"+i);
userlist.add(user);
}
}
public void bind(int port) throws Exception{
//配置服务端NIO 线程组
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
try {
server.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
/**
* 解码器: 构造器传入了两个参数:
* #1 单个对象序列化后最大字节长度,这是设置是1M;
* #2 类解析器: weakCachingConcurrentResolver创建线程安全的WeakReferenceMa对类加载器进行缓存,
* 支持多线程并发访问,当虚拟机内存不足时,会释放缓存中的内存,防止内存泄漏.
*/
ch.pipeline().addLast(new ObjectDecoder(1024*1024,
ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())) );
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new SubreqServerHandler());
}
});
System.out.println("Start the server success");
//绑定端口, 同步等待成功
ChannelFuture future = server.bind(port).sync();
//等待服务端监听端口关闭
future.channel().closeFuture().sync();
} finally {
//优雅关闭 线程组
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
public static void main(String[] args) {
SubreqServer server = new SubreqServer();
try {
server.bind(18888);
} catch (Exception e) {
e.printStackTrace();
}
}
}
(2)、SubreqServerHandler类
package nettyserverv1;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
/**
* Created by linguolong on 2019/05/08.
* Chatroom client built using netty framework
*/
public class SubreqServerHandler extends SimpleChannelInboundHandler<String>{
//新建一个channelGroup,用于存放连接的channel
public static ChannelGroup online_channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void handlerRemoved(ChannelHandlerContext ctx){
Channel leave_channel = ctx.channel();
for (Channel channel : online_channels) {
if (channel != leave_channel){
channel.writeAndFlush("[用户 " + leave_channel.remoteAddress() + "]下线了!\n");
}
}
System.out.println(ctx.channel().id()+"下线了");
//把刚下线的channel移除出在线用户队列
online_channels.remove(leave_channel);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//判断用户是否已经登录
if(!check_login(ctx)){
//未登录则进行登录验证
check_identity(ctx,msg);
}else{
//已登录则进行消息转发
Channel coming_channel = ctx.channel();
for (Channel channel : online_channels) {
if (channel != coming_channel){
channel.writeAndFlush("[用户 " + coming_channel.remoteAddress() + "]: " + msg );
} else {
channel.writeAndFlush("[我]: " + msg);
}
}
}
}
//用户信息验证,检查用户ID和密码是否正确
public void check_identity(ChannelHandlerContext ctx, Object msg){
UserInfo req = (UserInfo) msg;
System.out.println("service receive client login req :{"+ req.toString() +"}");
boolean login_flag = false;
for(int i=0;i<SubreqServer.userlist.size();i++){
if( SubreqServer.userlist.get(i).getUserID().equalsIgnoreCase(req.getUserID())&&(SubreqServer.userlist.get(i).getPassword().equals(req.getPassword()))){
login_flag=true;
}
}
if(login_flag){
System.out.println("账号"+req.getUserID()+"登录成功");
ctx.writeAndFlush("您已登录成功~\n");
//将当前的通道加入在线队列中
online_channels.add(ctx.channel());
}
else{
System.out.println("账号"+req.getUserID()+"登录失败");
ctx.writeAndFlush("登录失败!");
//关闭连接
ctx.close();
online_channels.remove(ctx.channel());
}
}
//判断用户是否已经在线
public boolean check_login(ChannelHandlerContext ctx){
boolean online_flag = false;
for(int i=0;i<online_channels.size();i++){
if(online_channels.contains(ctx.channel())){
online_flag = true;
}
}
return online_flag;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//释放资源
ctx.close();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
//ChannelRead0()只能处理类型为String的消息,因此我们这里不能用ChannelRead0()这个方法,这里的第二个参数类型使用了泛型
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
}
}
(3)、用户信息UserInfo类
package nettyserverv1;
import java.io.Serializable;
/**
* Created by linguolong on 2019/05/08.
* Chatroom User Infomation
*/
public class UserInfo implements Serializable{
private String userID;
private String userName;
private String password;
public String getUserID() {
return userID;
}
public void setUserID(String userID) {
this.userID = userID;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
@Override
public String toString() {
return "SubscribeReq: [messageID]:"+ userID + " [userName]:" +userName
+ " [password]:" +password;
}
}
2、Client端
(1)、SubreqClient类
package nettyclientv1;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
/**
* Created by linguolong on 2019/05/08.
* Chatroom client built using netty framework
*/
public class SubreqClient {
public void connect(int port, String host) throws Exception{
//配置客户端NIO 线程组
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//添加解码器
ch.pipeline().addLast(new ObjectDecoder(1024,
ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())) );
//添加编码器
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new SubreqClientHandler());
}
});
//异步获取当前已连接的channel
Channel now_channel = client.connect(host,port).sync().channel();
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
//异步等待客户端连接端口关闭
// now_channel.closeFuture().sync();
//使客户端一直处于输入状态,直到读取到"bye"
String message = " ";
while (true) {
//读到bye时退出
if(message.equals("bye")) break;
message = reader.readLine();
now_channel.writeAndFlush(message+"\n");
}
//读到了"bye"字符串,主动断开连接
now_channel.close();
} finally {
//优雅关闭 线程组
group.shutdownGracefully();
}
}
public static void main(String[] args) {
SubreqClient client = new SubreqClient();
try {
client.connect(18888, "127.0.0.1");
} catch (Exception e) {
e.printStackTrace();
}
}
}
(2)、SubreqClientHandler类
package nettyclientv1;
import nettyserverv1.UserInfo;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Created by linguolong on 2019/05/08.
* Chatroom client built using netty framework
*/
public class SubreqClientHandler extends ChannelInboundHandlerAdapter{
public SubreqClientHandler() {
}
/**
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(subReq("1231","user1","pwd1"));
ctx.flush();
}
private UserInfo subReq(String id,String userName,String password){
UserInfo req = new UserInfo();
req.setUserID(id);
req.setUserName(userName);
req.setPassword(password);
return req;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.print(msg.toString());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
五、说明
1、登录功能的实现:当客户端刚连接上服务器时,便构造一个UserInfo对象,对对象进行编码后发送给服务器。服务器接收到后对其进行解码,验证相应的账户ID和密码是否正确。
来源:https://blog.csdn.net/Alexwym/article/details/90047132


猜你喜欢
- 本文实例讲述了Android编程实现仿优酷旋转菜单效果。分享给大家供大家参考,具体如下:首先,看下效果:不好意思,不会制作动态图片,只好上传
- 使用idea进行JavaWeb开发时,在前端与后台交互常常出现乱码问题,包括日志/控制台输出乱码,参数乱码等问题,归根结底是编码格式不对,解
- 前言项目中时不时遇到查字典表等数据,只需要返回数据,不需要写其他业务,每个字典表可能都需要写一个接口给前端调用,比较麻烦,所以采用下面这种方
- 1.前置准备默认服务器上的hadoop服务已经启动本地如果是windows环境,需要本地配置下hadoop的环境变量本地配置hadoop的环
- 准备数据data class ContactEntity( val letter: Char, &n
- 在 Kotlin 中,reduce() 和 fold() 是函数式编程中常用的高阶函数。它们都是对集合中的元素进行聚合操作的函数,将一个集合
- 我们在日常开发中,经常会遇到类似的场景:当要做一件事儿的时候,这件事儿的步骤是固定好的,但是每一个步骤的具体实现方式是不一定的。通常,遇到这
- 本文实例讲述了C#实现IP摄像头的方法。分享给大家供大家参考。具体实现方法如下:#region IP摄像头代码/// <summary
- 最近开发过程中遇到了一个问题,之前没有太注意,这里记录一下。我用的SpringBoot版本是2.0.5,在跟前端联调的时候,有个请求因为用户
- 我们知道,使用nginx作为文件下载服务器,可以极大地降低对后端Java服务器的负载冲击,但是nginx本身并不提供授权控制,因此好的方案是
- 这篇文章主要介绍了Spring配置文件如何使用${username},文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习
- 有时候我们需要判断栈顶的应用是否是我们的应用,于是获取栈顶的应用包名的需求就出现了。在android5.0之前,系统提供了一套API可以实现
- 本文实例为大家分享了android通过NFC读取卡号的具体代码,供大家参考,具体内容如下1.获取权限<uses-permission
- android studio版本:2021.2.1例程名称:pravicydialog功能:1、启动app后弹窗隐私协议2、屏蔽返回键3、再
- 通俗的来说,Jackson是一个 Java 用来处理 JSON 格式数据的类库,其性能非常好。本文就来针对Jackson的用法做一个较为详细
- 本文实例为大家分享了java动态模拟时钟的具体代码,供大家参考,具体内容如下应用名称:java动态模拟时钟用到的知识:javaGUI,jav
- 个人认为单例模式是设计模式中最简单也是最常用的一种,是对有限资源合理利用的一种方式。这个模式看似简单,但是其中蕴含了关于并发、类加载、序列化
- Android Notification的多种用法总结我们在用手机的时候,如果来了短信,而我们没有点击查看的话,是不是在手机的最上边的状态栏
- 前言BeanPostProcessor是一个工厂钩子,允许Spring框架在新创建Bean实例时对其进行定制化修改。例如:通过检查其标注的接
- 问题怎么配置springBoot 内置tomcat,才能使得自己的服务效率更高呢?基础配置Spring Boot 能支持的最大并发量主要看其