Java实现手写线程池的示例代码
作者:一无是处的研究僧 发布时间:2022-01-09 13:08:24
前言
在我们的日常的编程当中,并发是始终离不开的主题,而在并发多线程当中,线程池又是一个不可规避的问题。多线程可以提高我们并发程序的效率,可以让我们不去频繁的申请和释放线程,这是一个很大的花销,而在线程池当中就不需要去频繁的申请线程,他的主要原理是申请完线程之后并不中断,而是不断的去队列当中领取任务,然后执行,反复这样的操作。在本篇文章当中我们主要是介绍线程池的原理,因此我们会自己写一个非常非常简单的线程池,主要帮助大家理解线程池的核心原理!!!
线程池给我们提供的功能
我们首先来看一个使用线程池的例子:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Demo01 {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 100; i++) {
pool.execute(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100; i++) {
System.out.println(Thread.currentThread().getName() + " print " + i);
}
}
});
}
}
}
在上面的例子当中,我们使用Executors.newFixedThreadPool去生成来一个固定线程数目的线程池,在上面的代码当中我们是使用5个线程,然后通过execute方法不断的去向线程池当中提交任务,大致流程如下图所示:
线程池通过execute函数不断的往线程池当中的任务队列加入任务,而线程池当中的线程会不断的从任务队列当中取出任务,然后进行执行,然后继续取任务,继续执行....,线程的执行过程如下:
while (true) {
Runnable runnable = taskQueue.take(); // 从任务队列当中取出任务
runnable.run(); // 执行任务
}
根据上面所谈到的内容,现在我们的需求很清晰了,首先我们需要有一个队列去存储我们所需要的任务,然后需要开启多个线程不断的去任务队列当中取出任务,然后进行执行,然后重复取任务执行任务的操作。
工具介绍
在我们前面提到的线程池实现的原理当中有一个非常重要的数据结构,就是ArrayBlockingQueue阻塞队列,它是一个并发安全的数据结构,我们首先先简单介绍一下这个数据结构的使用方法。(如果你想深入了解阻塞队列的实现原理,可以参考这篇文章JDK数组阻塞队列源码剖析)
我们主要用的是ArrayBlockingQueue的下面两个方法:
put函数,这个函数是往线程当中加入数据的。我们需要了解的是,如果一个线程调用了这个函数往队列当中加入数据,如果此时队列已经满了则线程需要被挂起,如果没有满则需要将数据加入到队列当中,也就是将数据存储到数组当中。
take函数,从队列当中取出数据,但是当队列为空的时候需要将调用这个方法的线程阻塞。当队列当中有数据的时候,就可以从队列当中取出数据。
需要注意的是,如果一个线程被上面两个任何一个线程阻塞之后,可以调用对应线程的interrupt方法终止线程的执行,同时还会抛出一个异常。
下面是一份测试代码:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class QueueTest {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(5); // 队列的容量为5
Thread thread = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
queue.put(i);
System.out.println("数据 " + i + "被加入到队列当中");
} catch (InterruptedException e) {
System.out.println("出现了中断异常");
// 如果出现中断异常 则退出 线程就不会一直在 put 方法被挂起了
return;
}finally {
}
}
});
thread.start();
TimeUnit.SECONDS.sleep(1);
thread.interrupt();
}
}
上面代码输出结果:
数据 0被加入到队列当中
数据 1被加入到队列当中
数据 2被加入到队列当中
数据 3被加入到队列当中
数据 4被加入到队列当中
出现了中断异常
上面代码的执行顺序是:
线程thread会将0-4这5个数据加入到队列当中,但是在加入第6个数据的时候,阻塞队列已经满了,因此在加入数据的时候线程thread会被阻塞,然后主线程在休息一秒之后中断了线程thread,然后线程thread发生了中断异常,然后被捕获进入catch代码块,然后函数返回,线程thread就不会一直被阻塞了,这一点在我们后面写线程池非常重要!!!
Worker设计
在前文当中我们已经提到了我们的线程需要不断的去任务队列里面取出任务然后执行,我们设计一个Worker类去做这件事!
首先在类当中肯定需要有一个线程池的任务队列,因为worker需要不断的从阻塞队列当中取出任务进行执行。
我们用一个isStopped变量表示线程是否需要终止了,也就是线程池是否需要关闭,如果线程池需要关闭了,那么线程也应该停止了。
我们还需要有一个变量记录执行任务的线程,因为当我们需要关闭线程池的时候需要等待任务队列当中所有的任务执行完成,那么当所有的任务都执行执行完成的时候,队列肯定是空的,而如果这个时候有线程还去取任务,那么肯定会被阻塞,前面已经提到了ArrayBlockingQueue的使用方法了,我们可以使用这个线程的interrupt的方法去中断这个线程的执行,这个线程会出现异常,然后这个线程捕获这个异常就可以退出了,因此我们需要知道对那个线程执行interrupt方法!
Worker实现的代码如下:
import java.util.concurrent.ArrayBlockingQueue;
public class Worker implements Runnable {
// 用于保存任务的队列
private ArrayBlockingQueue<Runnable> tasks;
// 线程的状态 是否终止
private volatile boolean isStopped;
// 保存执行 run 方法的线程
private volatile Thread thisThread;
public Worker(ArrayBlockingQueue<Runnable> tasks) {
// 这个参数是线程池当中传入的
this.tasks = tasks;
}
@Override
public void run() {
thisThread = Thread.currentThread();
while (!isStopped) {
try {
Runnable task = tasks.take();
task.run();
} catch (InterruptedException e) {
// do nothing
}
}
}
// 注意是其他线程调用这个方法 同时需要注意是 thisThread 这个线程在执行上面的 run 方法
// 其他线程调用 thisThread 的 interrupt 方法之后 thisThread 会出现异常 然后就不会一直阻塞了
// 会判断 isStopped 是否为 true 如果为 true 的话就可以退出 while 循环了
public void stop() {
isStopped = true;
thisThread.interrupt(); // 中断线程 thisThread
}
public boolean isStopped(){
return isStopped;
}
}
线程池设计
首先线程池需要可以指定有多少个线程,阻塞队列的最大长度,因此我们需要有这两个参数。
线程池肯定需要有一个队列去存放通过submit函数提交的任务。
需要有一个变量存储所有的woker,因为线程池关闭的时候需要将这些worker都停下来,也就是调用worker的stop方法。
需要有一个shutDown函数表示关闭线程池。
需要有一个函数能够停止所有线程的执行,因为关闭线程池就是让所有线程的工作停下来。
线程池实现代码:
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
public class MyFixedThreadPool {
// 用于存储任务的阻塞队列
private ArrayBlockingQueue<Runnable> taskQueue;
// 保存线程池当中所有的线程
private ArrayList<Worker> threadLists;
// 线程池是否关闭
private boolean isShutDown;
// 线程池当中的线程数目
private int numThread;
public MyFixedThreadPool(int i) {
this(Runtime.getRuntime().availableProcessors() + 1, 1024);
}
public MyFixedThreadPool(int numThread, int maxTaskNumber) {
this.numThread = numThread;
taskQueue = new ArrayBlockingQueue<>(maxTaskNumber); // 创建阻塞队列
threadLists = new ArrayList<>();
// 将所有的 worker 都保存下来
for (int i = 0; i < numThread; i++) {
Worker worker = new Worker(taskQueue);
threadLists.add(worker);
}
for (int i = 0; i < threadLists.size(); i++) {
new Thread(threadLists.get(i),
"ThreadPool-Thread-" + i).start(); // 让worker开始工作
}
}
// 停止所有的 worker 这个只在线程池要关闭的时候才会调用
private void stopAllThread() {
for (Worker worker : threadLists) {
worker.stop(); // 调用 worker 的 stop 方法 让正在执行 worker 当中 run 方法的线程停止执行
}
}
public void shutDown() {
// 等待任务队列当中的任务执行完成
while (taskQueue.size() != 0) {
// 如果队列当中还有任务 则让出 CPU 的使用权
Thread.yield();
}
// 在所有的任务都被执行完成之后 停止所有线程的执行
stopAllThread();
}
public void submit(Runnable runnable) {
try {
taskQueue.put(runnable); // 如果任务队列满了, 调用这个方法的线程会被阻塞
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试代码:
public class Test {
public static void main(String[] args) {
MyFixedThreadPool pool = new MyFixedThreadPool(5, 1024); // 开启5个线程 任务队列当中最多只能存1024个任务
for (int i = 0; i < 1000000; i++) {
pool.submit(() -> {
System.out.println(Thread.currentThread().getName()); // 提交的任务就是打印线程自己的名字
});
}
pool.shutDown();
}
}
上面的代码是可以正常执行并且结束的,这个输出太长了这里只列出部分输出结果:
ThreadPool-Thread-0
ThreadPool-Thread-4
ThreadPool-Thread-0
ThreadPool-Thread-1
ThreadPool-Thread-3
ThreadPool-Thread-1
ThreadPool-Thread-3
ThreadPool-Thread-3
ThreadPool-Thread-3
ThreadPool-Thread-3
ThreadPool-Thread-3
ThreadPool-Thread-2
ThreadPool-Thread-3
ThreadPool-Thread-2
ThreadPool-Thread-1
ThreadPool-Thread-0
ThreadPool-Thread-0
ThreadPool-Thread-0
ThreadPool-Thread-1
ThreadPool-Thread-4
从上面的输出我们可以看见线程池当中只有5个线程,这5个线程在不断从队列当中取出任务然后执行,因为我们可以看到同一个线程的名字输出了多次。
来源:https://www.cnblogs.com/Chang-LeHung/p/16599776.html
猜你喜欢
- 一、JdbcTemplateSpring 框架对 JDBC 进行封装,使用 JdbcTemplate 方便实现对数据库操作二、实战2.1 引
- 前言平时开发经常会用到List等集合操作,在这里做一个小结java集合Collectionjava里面集合分为两大类:List和Set,下面
- java 接口回调实例详解首先官方对接口回调的定义是这样的,所谓回调:就是A类中调用B类中的某个方法C,然后B类中反过来调用A类中的方法D,
- 前言上一篇文章已经介绍了fluent-mybatis项目的构建,文章地址:Java Fluent Mybatis实战之构建项目与代码生成篇上
- synchronized原理在java中,每一个对象有且仅有一个同步锁。这也意味着,同步锁是依赖于对象而存在。当我们调用某对象的synchr
- 昨天实现一个功能,根据文章的id或者别名查找文章。起初采用mybatis的Example进行查询,对参数artName进行判断,如果是纯数字
- 也许很多朋友在学习NIO的时候都会感觉有点吃力,对里面的很多概念都感觉不是那么明朗。在进入Java NIO编程之前,我们今天先来讨论一些比较
- 我们在java中处理字符串的时候,一般会选择String,在python中同样也是作用于字符串。那么我们今天延伸一下它的用法,只使用Stri
- 前提:微信公众平台:注册微信认证的公众号也就是服务号 ,拥有跟高级权限的微信接口。(注册服务号需要一些企业信息,需自己或者公司解决)注: 2
- 1. 前言ResultSetMetaData 叫元数据,是数据库 列对象,以列为单位封装为对象。元数据,指的是其包含列名,列值,列类型,列长
- java读取文件内容,解析Json格式数据一、读取txt文件内容(Json格式数据) public static
- IntelliJ IDEA是广受Java开发者喜爱的工具,其商业版的价格十分昂贵,如下图:现在有机会免费获取IntelliJ IDEA的正版
- 在开发应用过程中,客户端与服务端经常需要进行数据传输,涉及到重要隐私信息时,开发者自然会想到对其进行加密,即使传输过程中被“有心人”截取,也
- 以前公司的产品已经上线20多年了,主要是维护,也就是改bug。每周我们Team会从Jira上拿我们可以改的bug,因为每个团队负责的业务范围
- 网易Java程序员两轮面试题,请作答。part 1:网易JAVA程序员一面1.volatile有什么用?2.Minor GC和Full GC
- 在Spring mvc的开发中,我们可以通过RequestMapping来配,当前方法用于处理哪一个URL的请求.同样我们现在有一个需求,有
- 前言MyBatis-Plus 是基于 MyBatis 进行封装的一套优秀的持久层框架,它提供了丰富的便捷操作方法和强大的代码生成器,大大简化
- 在我们实现某些功能时,可能会有倒计时的需求。比如发送短信验证码,发送成功后可能要求用户一段时间内不能再次发送,这时候我们就需要进行倒计时,时
- mybatis官网中文文档:https://mybatis.org/mybatis-3/zh/sqlmap-xml.htmlmybatis-
- Java IDE工具提供了多种用户独特需求和个人偏好来创建编程环境的方法。Java框架能够简化程序员的工作。这些框架被设计和开发用于在任何服