实现java简单的线程池
作者:小胖java攻城狮 发布时间:2023-08-09 06:05:15
拆分实现流程
请看下面这张图
首先我们得对线程池进行一个功能拆分
Thread Pool 就是我们的线程池,t1,t2,t3代表三个线程
Blocking Queue代表阻塞队列
main代表main方法的线程
task1,task2,task3代表要执行的每个任务
现在我们梳理一下执行的流程,注意这里是简略版的,文章后面我会给出详细版的
所以此时,我们发现了需要创建几个类,或者说几个角色,分别是
线程池
工作线程
阻塞队列
拒绝策略(干嘛的?就是当线程数已经满了,并且阻塞队列也满了,还有任务想进入阻塞队列的时候,就可以拒绝这个任务)
实现方式
1.拒绝策略
/**
* 拒绝策略
*/
@FunctionalInterface
interface RejectPolicy<T>{
//queue就是我们自己实现的阻塞队列,task是任务
void reject(BlockingQueue<T> queue,T task);
}
2.阻塞队列
我们需要实现四个方法,获取和添加,超时获取和超时添加,至于方法实现的细节,我都备注了大量的注释进行解释。
/**
* 阻塞队列
*/
class BlockingQueue<T>{
//阻塞队列
private Deque<T> queue = new ArrayDeque<>();
//锁
private ReentrantLock lock = new ReentrantLock();
//生产者条件变量
private Condition fullWaitSet = lock.newCondition();
//消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
//容量
private int capacity;
public BlockingQueue(int capacity){
this.capacity = capacity;
}
//带有超时阻塞获取
public T poll(long timeout, TimeUnit timeUnit){
lock.lock();
try {
//将timeout统一转换为纳秒
long nanos = timeUnit.toNanos(timeout);
while(queue.isEmpty()){
try {
if(nanos <= 0){
//小于0,说明上次没有获取到,代表已经超时了
return null;
}
//返回值是剩余的时间
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
//通知生产者
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
//阻塞获取
public T take(){
lock.lock();
try{
while(queue.isEmpty()){ //如果任务队列为空,代表线程池没有可以执行的内容
try {
/*
也就说此时进来的线程是执行不了任务的,所以此时emptyWaitSet消费者要进行阻塞状态
等待下一次唤醒,然后继续判断队列是否为空
*/
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/*
代码执行到这里。说明任务队列不为空,线程池就从任务队列拿出一个任务出来执行
也就是说把阻塞队列的一个任务出队
*/
T t = queue.removeFirst();
/*
然后唤醒之前存放在生成者Condition休息室,因为由于之前阻塞队列已满,fullWaitSet才会进入阻塞状态
所以当阻塞队列删除了任务,就要唤醒之前进入阻塞状态的fullWaitSet
*/
fullWaitSet.signal();
//返回任务
return t;
}finally {
lock.unlock();
}
}
//阻塞添加
public void put(T task){
lock.lock();
try {
while(queue.size() == capacity){ //任务队列满了
try {
System.out.println("等待加入任务队列"+task);
/*
也就说此时进来的任务是进不了阻塞队列的,已经满了,所以此时生产者Condition要进入阻塞状态
等待下一次唤醒,然后继续判断队列是否为空
*/
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//任务队列还未满
System.out.println("加入任务队列"+task);
//把任务加入阻塞队列
queue.addLast(task);
/*
然后唤醒之前存放在消费者Condition休息室,因为由于之前阻塞队列为空,emptyWaitSet才会进入阻塞状态
所以当阻塞队列加入了任务,就要唤醒之前进入阻塞状态的emptyWaitSet
*/
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
//带超时阻塞时间添加
public boolean offer(T task,long timeout,TimeUnit timeUnit){
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while(queue.size() == capacity){
try {
if(nanos < 0){
return false;
}
System.out.println("等待加入任务队列"+task);
//不会一直阻塞,超时就会继续向下执行
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("加入任务队列"+task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
//获取任务数量
public int size(){
lock.lock();
try{
return queue.size();
}finally {
lock.unlock();
}
}
//尝试添加任务,如果阻塞队列已经满了,就使用拒绝策略
public void tryPut(RejectPolicy<T> rejectPolicy, T task){
lock.lock();
try {
//判断队列是否已满
if(queue.size() == capacity){
rejectPolicy.reject(this,task);
}else{ //有空闲
System.out.println("加入任务队列"+task);
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}
3.线程池和工作线程
我把工作线程当成线程池的内部类去实现。方便调用变量。
/**
* 线程池
*/
class ThreadPool{
//阻塞队列
private BlockingQueue<Runnable> taskQueue;
//线程集合
private HashSet<Worker> workers = new HashSet<>();
//核心线程数
private int coreSize;
//获取任务的超时时间
private long timeout;
private TimeUnit timeUnit;
private RejectPolicy<Runnable> rejectPolicy;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity,RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}
//执行任务
public void execute(Runnable task){
synchronized (workers){
if(workers.size() <= coreSize){ //当前的线程数小于核心线程数
Worker worker = new Worker(task);
workers.add(worker);
//让线程开始工作,执行它的run方法
worker.start();
}else{
// 1) 死等
// 2) 带超时等待
// 3) 让调用者放弃任务执行
// 4) 让调用者抛出异常
// 5) 让调用者自己执行任务
taskQueue.tryPut(rejectPolicy,task);
}
}
}
/**
* 工作线程,也就是线程池里面的线程
*/
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task){
this.task = task;
}
@Override
public void run() {
//执行任务
// 1) 当 task 不为空,执行任务
// 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
try {
System.out.println("正在执行的任务" + task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
//代表这个任务已经执行完了
task = null;
}
}
synchronized (workers) {
System.out.println("worker 被移除" + this);
workers.remove(this);
}
}
}
}
策略模式
细心的小伙伴已经发现,我在拒绝策略这里使用了23种设计模式的策略模式,因为我没有将拒绝的方式写死,而是交给了调用者去实现。
对比JDK的线程池
下面是JDK自带的线程池
经典的七大核心参数
corePoolSize:核心线程数
queueCapacity:任务队列容量(阻塞队列)
maxPoolSize:最大线程数
keepAliveTime:线程空闲时间
TimeUnit unit:超时时间单位
ThreadFactory threadFactory:线程工程
rejectedExecutionHandler:任务拒绝处理器
实际上我们自己实现的也大同小异,只不过JDK官方的更为复杂。
JDK线程执行的流程图
线程池的状态转化
线程我们知道在操作系统层面有5种状态
初始状态:仅是在语言层面创建了线程对象,还未与操作系统线程关联
可运行状态(就绪状态):指该线程已经被创建(与操作系统线程关联),可以由 CPU 调度执行
运行状态:指获取了 CPU 时间片运行中的状态,当 CPU 时间片用完,会从【运行状态】转换至【可运行状态】,会导致线程的上下文切换
阻塞状态
如果调用了阻塞 API,如 BIO 读写文件,这时该线程实际不会用到 CPU,会导致线程上下文切换,进入【阻塞状态】
等 BIO 操作完毕,会由操作系统唤醒阻塞的线程,转换至【可运行状态】
与【可运行状态】的区别是,对【阻塞状态】的线程来说只要它们一直不唤醒,调度器就一直不会考虑调度它们
终止状态:表示线程已经执行完毕,生命周期已经结束,不会再转换为其它状态
线程在Java API层面有6种状态
NEW 线程刚被创建,但是还没有调用 start() 方法
RUNNABLE 当调用了 start() 方法之后,注意,Java API 层面的
RUNNABLE 状态涵盖了 操作系统 层面的【可运行状态】、【运行状态】
BLOCKED , WAITING , TIMED_WAITING 都是 Java API 层面对【阻塞状态】的细分
TERMINATED 当线程代码运行结束
线程池有5种状态
RUNNING:能接受新任务,并处理阻塞队列中的任务
SHUTDOWN:不接受新任务,但是可以处理阻塞队列中的任务
STOP:不接受新任务,并且不处理阻塞队列中的任务,并且还打断正在运行任务的线程,就是直接不干了!
TIDYING:所有任务都终止,并且工作线程也为0,处于关闭之前的状态
TERMINATED:已关闭。
来源:https://blog.csdn.net/qq_45798556/article/details/118703927


猜你喜欢
- 对于现在的 App 来说,布局页面基本都会用到沉浸式状态栏,单纯的沉浸式状态栏很容易解决,但是在华为手机上存在一个底部虚拟按键的问题,会导致
- 代理模式代理(Proxy)是一种设计模式,提供了对目标对象另外的访问方式;即通过代理对象访问目标对象.这样做的好处是:可以在目标对象实现的基
- Java for循环打印菱形Java代码输出菱形的方法和思路有很多,在此分享一个稍带模块化拆分思想的解决方案,将需要输出的菱形拆分成8个模块
- 前言一般情况下,当我们使用 SpringDataElasticsearch 去操作 ES 时,索引名
- 本文实例为大家分享了Android快速实现断点续传的具体代码,供大家参考,具体内容如下1.导入依赖compile 'com.loop
- unity通过GetVector,GetColor,GetFloat等获取。SetVector,SetColor,SetFloat等设置。这
- 今天把Android Studio 升级到4.1版本,发现GsonFormat没有了,网上有的解决办法从https://plugins.je
- 前言:图片加载涉及到图片的缓存、图片的处理、图片的显示等。四种常用的图片加载框架,分别是Fresco、ImageLoader、 Picass
- 一、什么是JSONJSON(JavaScript Object Notation)是一种基于JavaScript语法子集的开放标准数据交换格
- 关于在Android中实现ListView的弹性效果,有很多不同的方法,网上一搜,也有很多,下面贴出在项目中经常用到的两种实现ListVie
- spring boot配置hikari连接池属性事件起因与一个简单应用经常发生Young GC,甚至在没有请求量的情况下也经常发生GC (A
- 最近遇到一个调试很久的问题,MyBatis 查询 Oracle 数据库查询结果与在客户端查询结果不一致。问题引入测试表、测试数据创建测试表、
- 下面是自己写的三个方法,直接类名.方法即可调用,注意此处未做异常处理.1.下划线转驼峰 lowerLineToHump()2.首字母大写 c
- 本篇我们讲解下使用spring创建bean的几种方式,创建bean,也可以叫组件注册,就是把单例bean放到spring容器中。我们定义如下
- 在 Java 中,方法调用一般通过 Virtual Call 还有 Classic Call。Classic Call 就是直接指向方法的地
- SpringBoot使用Commons Logging进行所有内部日志记录,但保留底层日志实现。默认提供了Java Util Logging
- Kotlin 面向对象这几天一直在准备考试,实在没有时间,已经过去了这么久,终于要到面向对象了!先看看Kotlin中的类长什么样吧.可以看到
- 一、概述我们知道,当我们对es发起search请求或其他操作时,往往都是随机选择一个coordinator发起请求。而这请求,可能是该节点能
- 目录1 简介2 项目整合2.1 JWT整合2.1.1 JWT工具类2.1.2 Token处理的Filter2.1.3 JWT属性2.2 Sp
- 一、Rxjava使用场景为了模拟实际场景,从wanandroid网站找了二个接口,如下:(对Wanandroid表示感谢!)public i