Socket结合线程池使用实现客户端和服务端通信demo
作者:Q.E.D 发布时间:2023-01-21 20:23:50
引导语
Socket 面试最终题一般都是让你写一个简单的客户端和服务端通信的例子,本文就带大家一起来写这个 demo。
1、要求
可以使用 Socket 和 ServiceSocket 以及其它 API;
写一个客户端和服务端之间 TCP 通信的例子;
服务端处理任务需要异步处理;
因为服务端处理能力很弱,只能同时处理 5 个请求,当第六个请求到达服务器时,需要服务器返回明确的错误信息:服务器太忙了,请稍后重试~。
需求比较简单,唯一复杂的地方在于第四点,我们需要对客户端的请求量进行控制,首先我们需要确认的是,我们是无法控制客户端发送的请求数的,所以我们只能从服务端进行改造,比如从服务端进行限流。
有的同学可能很快想到,我们应该使用 ServerSocket 的 backlog 的属性,把其设置成 5,但我们在上一章中说到 backlog 并不能准确代表限制的客户端连接数,而且我们还要求服务端返回具体的错误信息,即使 backlog 生效,也只会返回固定的错误信息,不是我们定制的错误信息。
我们好好想想,线程池似乎可以做这个事情,我们可以把线程池的 coreSize 和 maxSize 都设置成 4,把队列大小设置成 1,这样服务端每次收到请求后,会先判断一下线程池中的队列有没有数据,如果有的话,说明当前服务器已经马上就要处理第五个请求了,当前请求就是第六个请求,应该被拒绝。
正好线程池的加入也可以满足第三点,服务端的任务可以异步执行。
2、客户端代码
客户端的代码比较简单,直接向服务器请求数据即可,代码如下:
public class SocketClient {
private static final Integer SIZE = 1024;
private static final ThreadPoolExecutor socketPoll = new ThreadPoolExecutor(50, 50,
365L,
TimeUnit.DAYS,
new LinkedBlockingQueue<>(400));
@Test
public void test() throws InterruptedException {
// 模拟客户端同时向服务端发送 6 条消息
for (int i = 0; i < 6; i++) {
socketPoll.submit(() -> {
send("localhost", 7007, "nihao");
});
}
Thread.sleep(1000000000);
}
/**
* 发送tcp
*
* @param domainName 域名
* @param port 端口
* @param content 发送内容
*/
public static String send(String domainName, int port, String content) {
log.info("客户端开始运行");
Socket socket = null;
OutputStream outputStream = null;
InputStreamReader isr = null;
BufferedReader br = null;
InputStream is = null;
StringBuffer response = null;
try {
if (StringUtils.isBlank(domainName)) {
return null;
}
// 无参构造器初始化 Socket,默认底层协议是 TCP
socket = new Socket();
socket.setReuseAddress(true);
// 客户端准备连接服务端,设置超时时间 10 秒
socket.connect(new InetSocketAddress(domainName, port), 10000);
log.info("TCPClient 成功和服务端建立连接");
// 准备发送消息给服务端
outputStream = socket.getOutputStream();
// 设置 UTF 编码,防止乱码
byte[] bytes = content.getBytes(Charset.forName("UTF-8"));
// 输出字节码
segmentWrite(bytes, outputStream);
// 关闭输出
socket.shutdownOutput();
log.info("TCPClient 发送内容为 {}",content);
// 等待服务端的返回
socket.setSoTimeout(50000);//50秒还没有得到数据,直接断开连接
// 得到服务端的返回流
is = socket.getInputStream();
isr = new InputStreamReader(is);
br = new BufferedReader(isr);
// 从流中读取返回值
response = segmentRead(br);
// 关闭输入流
socket.shutdownInput();
//关闭各种流和套接字
close(socket, outputStream, isr, br, is);
log.info("TCPClient 接受到服务端返回的内容为 {}",response);
return response.toString();
} catch (ConnectException e) {
log.error("TCPClient-send socket连接失败", e);
throw new RuntimeException("socket连接失败");
} catch (Exception e) {
log.error("TCPClient-send unkown errror", e);
throw new RuntimeException("socket连接失败");
} finally {
try {
close(socket, outputStream, isr, br, is);
} catch (Exception e) {
// do nothing
}
}
}
/**
* 关闭各种流
*
* @param socket
* @param outputStream
* @param isr
* @param br
* @param is
* @throws IOException
*/
public static void close(Socket socket, OutputStream outputStream, InputStreamReader isr,
BufferedReader br, InputStream is) throws IOException {
if (null != socket && !socket.isClosed()) {
try {
socket.shutdownOutput();
} catch (Exception e) {
}
try {
socket.shutdownInput();
} catch (Exception e) {
}
try {
socket.close();
} catch (Exception e) {
}
}
if (null != outputStream) {
outputStream.close();
}
if (null != br) {
br.close();
}
if (null != isr) {
isr.close();
}
if (null != is) {
is.close();
}
}
/**
* 分段读
*
* @param br
* @throws IOException
*/
public static StringBuffer segmentRead(BufferedReader br) throws IOException {
StringBuffer sb = new StringBuffer();
String line;
while ((line = br.readLine()) != null) {
sb.append(line);
}
return sb;
}
/**
* 分段写
*
* @param bytes
* @param outputStream
* @throws IOException
*/
public static void segmentWrite(byte[] bytes, OutputStream outputStream) throws IOException {
int length = bytes.length;
int start, end = 0;
for (int i = 0; end != bytes.length; i++) {
start = i == 0 ? 0 : i * SIZE;
end = length > SIZE ? start + SIZE : bytes.length;
length -= SIZE;
outputStream.write(bytes, start, end - start);
outputStream.flush();
}
}
}
客户端代码中我们也用到了线程池,主要是为了并发模拟客户端一次性发送 6 个请求,按照预期服务端在处理第六个请求的时候,会返回特定的错误信息给客户端。
以上代码主要方法是 send 方法,主要处理像服务端发送数据,并处理服务端的响应。
3、服务端代码
服务端的逻辑分成两个部分,第一部分是控制客户端的请求个数,当超过服务端的能力时,拒绝新的请求,当服务端能力可响应时,放入新的请求,第二部分是服务端任务的执行逻辑。
3.1、对客户端请求进行控制
public class SocketServiceStart {
/**
* 服务端的线程池,两个作用
* 1:让服务端的任务可以异步执行
* 2:管理可同时处理的服务端的请求数
*/
private static final ThreadPoolExecutor collectPoll = new ThreadPoolExecutor(4, 4,
365L,
TimeUnit.DAYS,
new LinkedBlockingQueue<>(
1));
@Test
public void test(){
start();
}
/**
* 启动服务端
*/
public static final void start() {
log.info("SocketServiceStart 服务端开始启动");
try {
// backlog serviceSocket处理阻塞时,客户端最大的可创建连接数,超过客户端连接不上
// 当线程池能力处理满了之后,我们希望尽量阻塞客户端的连接
// ServerSocket serverSocket = new ServerSocket(7007,1,null);
// 初始化服务端
ServerSocket serverSocket = new ServerSocket();
serverSocket.setReuseAddress(true);
// serverSocket.bind(new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), 80));
serverSocket.bind(new InetSocketAddress("localhost", 7007));
log.info("SocketServiceStart 服务端启动成功");
// 自旋,让客户端一直在取客户端的请求,如果客户端暂时没有请求,会一直阻塞
while (true) {
// 接受客户端的请求
Socket socket = serverSocket.accept();
// 如果队列中有数据了,说明服务端已经到了并发处理的极限了,此时需要返回客户端有意义的信息
if (collectPoll.getQueue().size() >= 1) {
log.info("SocketServiceStart 服务端处理能力到顶,需要控制客户端的请求");
//返回处理结果给客户端
rejectRequest(socket);
continue;
}
try {
// 异步处理客户端提交上来的任务
collectPoll.submit(new SocketService(socket));
} catch (Exception e) {
socket.close();
}
}
} catch (Exception e) {
log.error("SocketServiceStart - start error", e);
throw new RuntimeException(e);
} catch (Throwable e) {
log.error("SocketServiceStart - start error", e);
throw new RuntimeException(e);
}
}
// 返回特定的错误码给客户端
public static void rejectRequest(Socket socket) throws IOException {
OutputStream outputStream = null;
try{
outputStream = socket.getOutputStream();
byte[] bytes = "服务器太忙了,请稍后重试~".getBytes(Charset.forName("UTF-8"));
SocketClient.segmentWrite(bytes, outputStream);
socket.shutdownOutput();
}finally {
//关闭流
SocketClient.close(socket,outputStream,null,null,null);
}
}
}
我们使用 collectPoll.getQueue().size() >= 1 来判断目前服务端是否已经到达处理的极限了,如果队列中有一个任务正在排队,说明当前服务端已经超负荷运行了,新的请求应该拒绝掉,如果队列中没有数据,说明服务端还可以接受新的请求。
以上代码注释详细,就不累赘说了。
3.2、服务端任务的处理逻辑
服务端的处理逻辑比较简单,主要步骤是:从客户端的 Socket 中读取输入,进行处理,把响应返回给客户端。
我们使用线程沉睡 2 秒来模拟服务端的处理逻辑,代码如下:
public class SocketService implements Runnable {
private Socket socket;
public SocketService() {
}
public SocketService(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
log.info("SocketService 服务端任务开始执行");
OutputStream outputStream = null;
InputStream is = null;
InputStreamReader isr = null;
BufferedReader br = null;
try {
//接受消息
socket.setSoTimeout(10000);// 10秒还没有得到数据,直接断开连接
is = socket.getInputStream();
isr = new InputStreamReader(is,"UTF-8");
br = new BufferedReader(isr);
StringBuffer sb = SocketClient.segmentRead(br);
socket.shutdownInput();
log.info("SocketService accept info is {}", sb.toString());
//服务端处理 模拟服务端处理耗时
Thread.sleep(2000);
String response = sb.toString();
//返回处理结果给客户端
outputStream = socket.getOutputStream();
byte[] bytes = response.getBytes(Charset.forName("UTF-8"));
SocketClient.segmentWrite(bytes, outputStream);
socket.shutdownOutput();
//关闭流
SocketClient.close(socket,outputStream,isr,br,is);
log.info("SocketService 服务端任务执行完成");
} catch (IOException e) {
log.error("SocketService IOException", e);
} catch (Exception e) {
log.error("SocketService Exception", e);
} finally {
try {
SocketClient.close(socket,outputStream,isr,br,is);
} catch (IOException e) {
log.error("SocketService IOException", e);
}
}
}
}
4、测试
测试的时候,我们必须先启动服务端,然后再启动客户端,首先我们启动服务端,打印日志如下:
接着我们启动客户端,打印日志如下:
我们最后看一下服务端的运行日志:
从以上运行结果中,我们可以看出得出的结果是符合我们预期的,服务端在请求高峰时,能够并发处理5个请求,其余请求可以用正确的提示进行拒绝。
5、总结
所以代码集中在 SocketClient、SocketServiceStart、SocketService 中,启动的顺序为先启动 SocketServiceStart,后运行 SocketClient,感兴趣的同学可以自己 debug 下,加深印象。
来源:https://blog.csdn.net/qq_34272760/article/details/120662160
猜你喜欢
- 先给大家介绍下Java获取上月份最后一天日期8位。代码如下所示:/** * 获取上个月的最后一天23点59分59
- 实体例子public class Person { private String name; &nb
- 首先我们常用的注解包括@Entity、@Table、@Id、@IdClass、@GeneratedValue、@Basic、@Transie
- 前言 短时间提升自己最快的手段就是背面试题,最近总结了Java常用的面试题,分享给大家,希望大家都能圆梦大厂,加油,我命由我不由天
- 这个问题是我自己开发中遇到的问题 数据库使用的是mysql5.6 字段名称为checkingTime 类
- 【题目】 汉诺塔问题比较经典,这里修改一下游戏规则:现在限制不能从最左侧的塔直接移动到最右侧,也不能从最右侧直接移动到最左侧,而是必须经过中
- 在实际业务中,当后台数据发生变化,客户端能够实时的收到通知,而不是由用户主动的进行页面刷新才能查看,这将是一个非常人性化的设计。有没有那么一
- 这篇文章主要介绍了如何通过Java实现时间轴过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友
- 本文实例讲述了Spring和Hibernate的整合操作。分享给大家供大家参考,具体如下:一 web配置<?xml version=&
- 获得redis所有的key-value运行结果:redis配置文件需要序列化@Bean public RedisT
- maven项目中在pom.xml中依赖2个jar包,其他的spring的jar包省略:<dependency> &
- 相关文章:Java使用POI导出Excel(一):单sheetJava使用POI导出Excel(二):多个sheet相信在大部分的web项目
- 问题引出:最近开了新项目,项目中用到了数据字典,列表查询数据返回的时候需要手动将code转换为name,到前台展示。项目经理表示可以封装一个
- 今天是解决报错的一天,首先在操作Springboot中的时候,有些朋友的yml显示的不是绿叶的图标,或者是配置了之后不生效的问题。第一个解决
- 从这章开始,会介绍几个常用的函数式接口工具,首先先来看下这个大家族:首先从Function接口开始介绍一. 概述该接口顾名思义,函数的意思,
- 1. 前言Spring提供了xml、注解、JavaConfig多种方式来配置bean,不论何种方式,Spring最终都会将bean封装成Be
- 本文实例展示的是一个自定义的定时器组件,有别于.NET Framework里面提供的几个Timer。首先说说该组件开发背景,发现现在手头上的
- 1、在POM.xml文件下添加如下代码;注意:version、configuration、executions三个标签是我后来查找添加的,网
- 本文实例讲述了C#实现读写ini文件类。分享给大家供大家参考。具体如下:这个C#类封装了对INI配置文件进行操作所需的各种函数,包括读取键值
- 1、概述本文通过手动实现迭代器来了解foreach语句的本质。2、使用foreach语句遍历集合在C#中,使用foreach语句来遍历集合。