Java实现非阻塞式服务器的示例代码
作者:低吟不作语 发布时间:2022-01-29 19:29:11
1.创建阻塞的服务器
当 ServerSocketChannel 与 SockelChannel 采用默认的阻塞模式时,为了同时处理多个客户的连接,必须使用多线程
public class EchoServer {
private int port = 8000;
private ServerSocketChannel serverSocketChannel = null;
private ExecutorService executorService; //线程池
private static final int POOL_MULTIPLE = 4; //线程池中工作线程的数目
public EchoServer() throws IOException {
//创建一个线程池
executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * POOL_MULTIPLE);
//创建一个ServerSocketChannel对象
serverSocketChannel = ServerSocketChannel.open();
//使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时,可以顺利绑定相同的端口
serverSocketChannel.socket().setReuseAddress(true);
//把服务器进程与一个本地端口绑定
serverSocketChannel.socket().bind(new InetSocketAddress(port));
System.out.println("服务器启动");
}
public void service() {
while (true) {
SocketChannel socketChannel = null;
try {
socketChannel = serverSocketChannel.accept();
//处理客户连接
executorService.execute(new Handler(socketChannel));
} catch(IOException e) {
e.printStackTrace();
}
}
}
public static void main(String args[])throws IOException {
new EchoServer().service();
}
//处理客户连按
class Handler implements Runnable {
private SocketChannel socketChannel;
public Handler(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
public void run() {
handle(socketChannel);
}
public void handle(SocketChannel socketChannel) {
try {
//获得与socketChannel关联的Socket对象
Socket socket = socketChannel.socket();
System.out.println("接收到客户连接,来自:" + socket.getInetAddress() + ":" + socket.getPort());
BufferedReader br = getReader(socket);
PrintWriter pw = getWriter(socket);
String msg = null;
while ((msg = br.readLine()) != null) {
System.out.println(msg);
pw.println(echo(msg));
if (msg.equals("bye")) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if(socketChannel != null) {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
private PrintWriter getWriter(Socket socket) throws IOException {
OutputStream socketOut = socket.getOutputStream();
return new PrintWriter(socketOut,true);
}
private BufferedReader getReader(Socket socket) throws IOException {
InputStream socketIn = socket.getInputStream();
return new BufferedReader(new InputStreamReader(socketIn));
}
public String echo(String msg) {
return "echo:" + msg;
}
}
2.创建非阻塞的服务器
在非阻塞模式下,EchoServer 只需要启动一个主线程,就能同时处理三件事:
接收客户的连接
接收客户发送的数据
向客户发回响应数据
EchoServer 委托 Selector 来负责监控接收连接就绪事件、读就绪事件和写就绪事件如果有特定事件发生,就处理该事件
// 创建一个Selector对象
selector = Selector.open();
//创建一个ServerSocketChannel对象
serverSocketChannel = ServerSocketChannel.open();
//使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时
//可以顺利绑定到相同的端口
serverSocketChannel.socket().setReuseAddress(true);
//使ServerSocketChannel工作于非阻塞模式
serverSocketChannel.configureBlocking(false):
//把服务器进程与一个本地端口绑定
serverSocketChannelsocket().bind(new InetSocketAddress(port));
EchoServer 类的 service() 方法负责处理本节开头所说的三件事,体现其主要流程的代码如下:
public void service() throws IOException {
serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT);
//第1层while循环
while(selector.select() > 0) {
//获得Selector的selected-keys集合
Set readyKeys = selector.selectedKeys();
Iterator it = readyKeys.iterator();
//第2层while循环
while (it.hasNext()) {
SelectionKey key = null;
//处理SelectionKey
try {
//取出一个SelectionKey
key = (SelectionKey) it.next();
//把 SelectionKey从Selector 的selected-key 集合中删除
it.remove();
1f (key.isAcceptable()) { 处理接收连接就绪事件; }
if (key.isReadable()) { 处理读就绪水件; }
if (key.isWritable()) { 处理写就绪事件; }
} catch(IOException e) {
e.printStackTrace();
try {
if(key != null) {
//使这个SelectionKey失效
key.cancel();
//关闭与这个SelectionKey关联的SocketChannel
key.channel().close();
}
} catch(Exception ex) {
e.printStackTrace();
}
}
}
}
}
首先由
ServerSocketChannel
向Selector
注册接收连接就绪事件,如果Selector
监控到该事件发生,就会把相应的SelectionKey
对象加入selected-keys
集合第一层 while 循环,不断询问
Selector
已经发生的事件,select()
方法返回当前相关事件已经发生的SelectionKey
的个数,如果当前没有任何事件发生,该方法会阻塞下去,直到至少有一个事件发生。Selector
的selectedKeys()
方法返回selected-keys
集合,它存放了相关事件已经发生的SelectionKey
对象第二层 while 循环,从
selected-keys
集合中依次取出每个SelectionKey
对象并从集合中删除,,然后调用isAcceptable()
、isReadable()
和isWritable()
方法判断到底是哪种事件发生了,从而做出相应的处理
2.1处理接收连接就绪事件
if (key.isAcceptable()) {
//获得与SelectionKey关联的ServerSocketChannel
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//获得与客户连接的SocketChannel
SocketChannel socketChannel = (SocketChannel) ssc.accept();
//把Socketchannel设置为非阻塞模式
socketChannel.configureBlocking(false);
//创建一个用于存放用户发送来的数据的级冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//Socketchannel向Selector注册读就绪事件和写就绪事件
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
}
2.2处理读就绪事件
public void receive(SelectionKey key) throws IOException {
//获得与SelectionKey关联的附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
//获得与SelectionKey关联的Socketchannel
SocketChannel socketChannel = (SocketChannel)key.channel();
//创建一个ByteBuffer用于存放读到的数据
ByteBuffer readBuff = ByteBuffer.allocate(32);
socketChannel.read(readBuff);
readBuff.flip();
//把buffer的极限设为容量
buffer.limit(buffer.capacity());
//把readBuff中的内容拷贝到buffer
buffer.put(readBuff);
}
2.3处理写就绪事件
public void send(SelectionKey key) throws IOException {
//获得与SelectionKey关联的ByteBuffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
//获得与SelectionKey关联的SocketChannel
SocketChannel socketChannel = (SocketChannel) key.channel();
buffer.flip();
//按照GBK编码把buffer中的字节转换为字符串
String data = decode(buffer);
//如果还没有读到一行数据就返回
if(data.indexOf("\r\n") == -1)
return;
//截取一行数据
String outputData = data.substring(0, data.indexOf("\n") + 1);
//把输出的字符串按照GBK编码转换为字节,把它放在outputBuffer中
ByteBuffer outputBuffer = encode("echo:" + outputData);
//输出outputBuffer的所有字节
while(outputBuffer,hasRemaining())
socketChannel.write(outputBuffer);
//把outputData字符审按照GBK编码,转换为字节,把它放在ByteBuffer
ByteBuffer temp = encode(outputData);
//把buffer的位置设为temp的极限
buffer.position(temp.limit()):
//删除buffer已经处理的数据
buffer.compact();
//如果已经输出了字符串“bye\r\n”,就使SelectionKey失效,并关闭SocketChannel
if(outputData.equals("bye\r\n")) {
key.cancel();
socketChannel.close();
}
}
完整代码如下:
public class EchoServer {
private int port = 8000;
private ServerSocketChannel serverSocketChannel = null;
private Selector selector;
private Charset charset = Charset.forName("GBK");
public EchoServer() throws IOException {
// 创建一个Selector对象
selector = Selector.open();
//创建一个ServerSocketChannel对象
serverSocketChannel = ServerSocketChannel.open();
//使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时
//可以顺利绑定到相同的端口
serverSocketChannel.socket().setReuseAddress(true);
//使ServerSocketChannel工作于非阻塞模式
serverSocketChannel.configureBlocking(false):
//把服务器进程与一个本地端口绑定
serverSocketChannelsocket().bind(new InetSocketAddress(port));
}
public void service() throws IOException {
serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT);
//第1层while循环
while(selector.select() > 0) {
//获得Selector的selected-keys集合
Set readyKeys = selector.selectedKeys();
Iterator it = readyKeys.iterator();
//第2层while循环
while (it.hasNext()) {
SelectionKey key = null;
//处理SelectionKey
try {
//取出一个SelectionKey
key = (SelectionKey) it.next();
//把 SelectionKey从Selector 的selected-key 集合中删除
it.remove();
1f (key.isAcceptable()) {
//获得与SelectionKey关联的ServerSocketChannel
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//获得与客户连接的SocketChannel
SocketChannel socketChannel = (SocketChannel) ssc.accept();
//把Socketchannel设置为非阻塞模式
socketChannel.configureBlocking(false);
//创建一个用于存放用户发送来的数据的级冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//Socketchannel向Selector注册读就绪事件和写就绪事件
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
}
if (key.isReadable()) { receive(key); }
if (key.isWritable()) { send(key); }
} catch(IOException e) {
e.printStackTrace();
try {
if(key != null) {
//使这个SelectionKey失效
key.cancel();
//关闭与这个SelectionKey关联的SocketChannel
key.channel().close();
}
} catch(Exception ex) {
e.printStackTrace();
}
}
}
}
}
public void receive(SelectionKey key) throws IOException {
//获得与SelectionKey关联的附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
//获得与SelectionKey关联的Socketchannel
SocketChannel socketChannel = (SocketChannel)key.channel();
//创建一个ByteBuffer用于存放读到的数据
ByteBuffer readBuff = ByteBuffer.allocate(32);
socketChannel.read(readBuff);
readBuff.flip();
//把buffer的极限设为容量
buffer.limit(buffer.capacity());
//把readBuff中的内容拷贝到buffer
buffer.put(readBuff);
}
public void send(SelectionKey key) throws IOException {
//获得与SelectionKey关联的ByteBuffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
//获得与SelectionKey关联的SocketChannel
SocketChannel socketChannel = (SocketChannel) key.channel();
buffer.flip();
//按照GBK编码把buffer中的字节转换为字符串
String data = decode(buffer);
//如果还没有读到一行数据就返回
if(data.indexOf("\r\n") == -1)
return;
//截取一行数据
String outputData = data.substring(0, data.indexOf("\n") + 1);
//把输出的字符串按照GBK编码转换为字节,把它放在outputBuffer中
ByteBuffer outputBuffer = encode("echo:" + outputData);
//输出outputBuffer的所有字节
while(outputBuffer,hasRemaining())
socketChannel.write(outputBuffer);
//把outputData字符审按照GBK编码,转换为字节,把它放在ByteBuffer
ByteBuffer temp = encode(outputData);
//把buffer的位置设为temp的极限
buffer.position(temp.limit()):
//删除buffer已经处理的数据
buffer.compact();
//如果已经输出了字符串“bye\r\n”,就使SelectionKey失效,并关闭SocketChannel
if(outputData.equals("bye\r\n")) {
key.cancel();
socketChannel.close();
}
}
//解码
public String decode(ByteBuffer buffer) {
CharBuffer charBuffer = charset.decode(buffer);
return charBuffer.toStrinq();
}
//编码
public ByteBuffer encode(String str) {
return charset.encode(str);
}
public static void main(String args[])throws Exception {
EchoServer server = new EchoServer();
server.service();
}
}
3.阻塞模式与非阻塞模式混合使用
使用非阻塞模式时,ServerSocketChannel 以及 SocketChannel 都被设置为非阻塞模式,这使得接收连接、接收数据和发送数据的操作都采用非阻塞模式,EchoServer 采用一个线程同时完成这些操作
假如有许多客户请求连接,可以把接收客户连接的操作单独由一个线程完成,把接收数据和发送数据的操作由另一个线程完成,这可以提高服务器的并发性能
负责接收客户连接的线程按照阻塞模式工作,如果收到客户连接,就向 Selector 注册读就绪和写就绪事件,否则进入阻塞状态,直到接收到了客户的连接。负责接收数据和发送数据的线程按照非阻塞模式工作,只有在读就绪或写就绪事件发生时,才执行相应的接收数据和发送数据操作
public class EchoServer {
private int port = 8000;
private ServerSocketChannel serverSocketChannel = null;
private Selector selector = null;
private Charset charset = Charset.forName("GBK");
public EchoServer() throws IOException {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().setReuseAddress(true);
serverSocketChannelsocket().bind(new InetSocketAddress(port));
}
public void accept() {
while(true) {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
synchronized(gate) {
selector.wakeup();
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
}
} catch(IOException e) {
e.printStackTrace();
}
}
}
private Object gate=new Object();
public void service() throws IOException {
while(true) {
synchronized(gate){}
int n = selector.select();
if(n == 0) continue;
Set readyKeys = selector.selectedKeys();
Iterator it = readyKeys.iterator();
while (it.hasNext()) {
SelectionKey key = null;
try {
it.remove();
if (key.isReadable()) {
receive(key);
}
if (key.isWritable()) {
send(key);
}
} catch(IOException e) {
e.printStackTrace();
try {
if(key != null) {
key.cancel();
key.channel().close();
}
} catch(Exception ex) { e.printStackTrace(); }
}
}
}
}
public void receive(SelectionKey key) throws IOException {
...
}
public void send(SelectionKey key) throws IOException {
...
}
public String decode(ByteBuffer buffer) {
...
}
public ByteBuffer encode(String str) {
...
}
public static void main(String args[])throws Exception {
final EchoServer server = new EchoServer();
Thread accept = new Thread() {
public void run() {
server.accept();
}
};
accept.start();
server.service();
}
}
注意一点:主线程的 selector select() 方法和 Accept 线程的 register(...) 方法都会造成阻塞,因为他们都会操作 Selector 对象的共享资源 all-keys 集合,这有可能会导致死锁
导致死锁的具体情形是:Selector 中尚没有任何注册的事件,即 all-keys 集合为空,主线程执行 selector.select() 方法时将进入阻塞状态,只有当 Accept 线程向 Selector 注册了事件,并且该事件发生后,主线程才会从 selector.select() 方法返回。然而,由于主线程正在 selector.select() 方法中阻塞,这使得 Acccept 线程也在 register() 方法中阻塞。Accept 线程无法向 Selector 注册事件,而主线程没有任何事件可以监控,所以这两个线程将永远阻塞下去
为了避免对共享资源的竞争,同步机制使得一个线程执行 register() 时,不允许另一个线程同时执行 select() 方法,反之亦然
来源:https://www.cnblogs.com/Yee-Q/p/17416992.html


猜你喜欢
- 一、概念 java对象序列化的意思就是将对象的状态转化成字节流,以后
- 在Android中,在非主线程中更新UI控件是不安全的,app在运行时会直接Crash,所以当我们需要在非主线程中更新UI控件,那么就需要用
- 接上回上一篇我们简单介绍了基于SpringBoot实现简单的Web开发,本节来看Web开发中必不可少的内容—&m
- 代码如下一、创建 CountdownTimer.xaml 继承ContentControl代码如下。using?System;us
- 一、背景在我们编写程序的过程中,程序中可能随时发生各种异常,那么我们如何优雅的处理各种异常呢?二、需求1、拦截系统中部分异常,返回自定义的响
- 一个activity就好比一个网页,此文章讲解怎样创建一个activity并且实现跳转!一、学习创建Activity1、新建一个java类,
- RestTemplate第一次请求响应速度较慢问题使用RestTemplate请求微信的接口发现第一次请求需要8秒左右的时间,查阅了JDK资
- Android 自定义按钮点击事件和长按事件对比一个按钮同时实现点击和长按事件,有时候会有冲突,我们针对这一现象来自定义按钮来区
- 前言前面说到在我们应对高并发的场景,请求量过于大的情况下给我们服务器的压力很多,造成缓存穿透、击穿、雪崩,那么我们采用布隆过滤器,有兴趣的小
- 本文实例为大家分享了android自定义滚动上下回弹scollView的具体代码,供大家参考,具体内容如下这是一个自定义view,在xml布
- 在考虑类初始化时,我们都知道进行子类初始化时,如果父类没有初始化要先初始化子类。然而事情并没有一句话这么简单。首先看看Java中初始化触发的
- 本文实例讲述了C#中Arraylist的sort函数用法。分享给大家供大家参考。具体如下:ArrayList的sort函数有几种比较常用的重
- 这篇文章主要介绍了SpringBoot跨域Access-Control-Allow-Origin实现解析,文中通过示例代码介绍的非常详细,对
- Spring Data Jpa复杂查询总结只是做一个总结所以就不多说废话了实体类@Entity@Table(name = "t_h
- 进程进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。程序是指
- 举例说明:1、有一个200*200像素的窗口,想要把它放在800*600像素的屏幕中间,屏幕的位置应是(800/2,600/2)=(400,
- 本文实例讲述了C#提取网页中超链接link和text部分的方法。分享给大家供大家参考,具体如下:string s = "..&qu
- 首先看如下代码示例:System.out.println(0.05 + 0.01);System.out.println(0.05 - 0.
- Android中手机震动的设置(Vibrator)的步骤: a、通过系统服务获得手机震动服务,Vibrator vibrator = (Vi
- 本文实例讲述了C#显示文件夹下所有图片文件的方法。分享给大家供大家参考。具体实现方法如下:<%@ Page Language=&quo