Java并发线程池实例分析讲解
作者:飞奔的小付 发布时间:2022-08-05 20:25:40
一.为什么要用线程池
先来看个简单的例子
1.直接new Thread的情况:
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
final List<Integer> list = new ArrayList<>();
final Random random = new Random();
for (int i = 0; i < 100000; i++) {
Thread thread = new Thread() {
@Override
public void run() {
list.add(random.nextInt());
}
};
thread.start();
thread.join();
}
System.out.println("执行时间:" + (System.currentTimeMillis() - start));
System.out.println("执行大小:" + list.size());
}
执行时间:6437
执行大小:100000
2.使用线程池时
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
final List<Integer> list = new ArrayList<>();
final Random random = new Random();
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 100000; i++) {
executorService.execute(()->{
list.add(random.nextInt());
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
System.out.println("执行时间:" + (System.currentTimeMillis() - start));
System.out.println("执行大小:" + list.size());
}
执行时间:82
执行大小:100000
从执行时间可以看出来,使用线程池的效率要远远超过直接new Thread。
二.线程池的好处
降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
三.原理解析
四.4种线程池
1.newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
特点:newCachedThreadPool会创建一个可缓存线程池,如果当前线程池的长度超过了处理的需要时,可以灵活的回收空闲的线程,当需要增加时,它可以灵活的添加新的线程,而不会对线程池的长度作任何限制。
因为其最大线程数是Integer.MAX_VALUE,若新建的线程数多了,会超过机器的可用内存而OOM,但是因为其不是 * 队列,所以在OOM之前一般会CPU 100%。
2.newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
该方法会创建一个固定长度的线程池,控制最大并发数,超出的线程会在队列中等待,因为线程的数量是固定的,但是阻塞队列是 * 的,如果请求数较多时,会造成阻塞队列越来越长,超出可用内存 进而OOM,所以要根据系统资源设置线程池的大小。Runtime.getRuntime().availableProcessors()
3.newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
会创建一个单一的线程,前一个任务执行完毕才会执行下一个线程,FIFO,保证顺序执行。但是高并发下不太适用
4.newScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
创建一个固定长度的线程池,而且支持定时的以及周期性的任务执行,所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。
阿里规范中不推荐使用以上线程池,推荐使用自定义的线程池,当然如果你的项目中的数量级比较小的话那到没什么影响。
自定义线程池:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10),new MonkeyRejectedExecutionHandler());
执行优先级 : 核心线程>非核心线程>队列
提交优先级 : 核心线程>队列>非核心线程
五.线程池处理流程
流程图:
六.源码分析
流程图
ThreadPoolExecutor的execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//1.判断线程数是否小于核心线程数,如果是则使用入参任务通过addWorker方法创建一个新的线程,如果能完成新线程创建execute方法结束,成功提交任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//2.在第一步没有完成任务提交;状态为运行并且能成功加入任务到工作队列后,再进行一次check,如果状态在任务加入队列后变为了非运行(有可能是在执行到这里线程池shtdown了),非运行状态下当然是需要reject;
// offer和add方法差不多,add方法就是调用的offer,只不过比offer多抛出一个异常 throw new IllegalStateException("Queue full")
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
//3.判断当前工作线程池数是否为0,如果是创建一个null任务,任务在堵塞队列存在了就会从队列中取出这样做的意义是保证线程池在running状态必须有一个任务在执行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//4.如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经shutdown或者线程池已经达到饱和状态,所以reject.拒绝策略不仅仅是在饱和状态下使用,在线程池进入到关闭阶段同样需要使用到;
else if (!addWorker(command, false))
reject(command);
}
}
再进入到addWork方法
private boolean addWorker(Runnable firstTask, boolean core) {
// goto写法 重试
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
//线程状态非运行并且非shutdown状态任务为空,队列非空就不能新增线程了
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
//当前线程达到了最大阈值 就不再新增线程了
return false;
if (compareAndIncrementWorkerCount(c))
//ctl+1工作线程池数量+1如果成功 就跳出死循环
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
//进来的状态和此时的状态发生改变重头开始重试
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);//内部类封装了线程和任务 通过threadfactory创建线程
//毎一个worker就是一个线程数
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//重新获取线程状态
int rs = runStateOf(ctl.get());
// 状态小于shutdown 就是running状态 或者 为shutdown并且firstTask为空是从队列中处理 任务那就可以放到集合中
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 线程还没start就是alive就直接异常
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
// 记录最大线程数
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//失败回退从wokers移除w线程数减1尝试结束线程池
addWorkerFailed(w);
}
return workerStarted;
}
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
//正在运行woker线程
final Thread thread;
/** Initial task to run. Possibly null. */
//传入的任务
Runnable firstTask;
/** Per-thread task counter */
//完成的任务数监控用
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
//禁止线程中断
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
再来看runworker方法
final void runWorker(Worker w) {
//获取当前线程
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts 把state从‐1改为0意思是可以允许中断
boolean completedAbruptly = true;
try {
//task不为空或者阻塞队列中拿到了任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//如果当前线程池状态等于stop就中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
//这设置为空等下次循环就会从队列里面获取
task = null;
//完成任务数+1
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
获取任务的方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);//获取线程池运行状态
// Check if queue empty only if necessary.
//shutdown或者为空那就工作线程‐1同时返回为null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//重新获取工作线程数
int wc = workerCountOf(c);
// Are workers subject to culling?
// timed是标志超时销毁 核心线程池也是可以销毁的
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
runWorker中的processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
ThreadPoolExecutor内部有实现4个拒绝策略:(1)、
CallerRunsPolicy,由调用execute方法提交任务的线程来执行这个任务;
AbortPolicy,抛出异常RejectedExecutionException拒绝提交任务;
DiscardPolicy,直接抛弃任务,不做任何处理;
DiscardOldestPolicy,去除任务队列中的第一个任务(最旧的),重新提交
ScheduledThreadPoolExecutor
schedule:延迟多长时间之后只执行一次;
scheduledAtFixedRate固定:延迟指定时间后执行一次,之后按照固定的时长周期执行;
scheduledWithFixedDelay非固定:延迟指定时间后执行一次,之后按照:上一次任务执行时长+周期的时长的时间去周期执行;
private void delayedExecute(RunnableScheduledFuture<?> task) {
//如果线程池不是RUNNING状态,则使用拒绝策略把提交任务拒绝掉
if (isShutdown())
reject(task);
else {
//与ThreadPoolExecutor不同,这里直接把任务加入延迟队列
super.getQueue().add(task);
//如果当前状态无法执行任务,则取消
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//和ThreadPoolExecutor不一样,corePoolSize没有达到会增加Worker;
//增加Worker,确保提交的任务能够被执行
ensurePrestart();
}
}
add方法里其实是调用了offer方法
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
//容量扩增50%
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
//插入堆尾
siftUp(i, e);
}
if (queue[0] == e) {
//如果新加入的元素成为了堆顶,则原先的leader就无效了
leader = null;
//由于原先leader已经无效被设置为null了,这里随便唤醒一个线程(未必是原先的leader)来取走堆顶任务
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
siftup方法:主要是对队列进行排序
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
//获取父节点
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
//如果key节点的执行时间大于父节点的执行时间,不需要再排序了
if (key.compareTo(e) >= 0)
break;
//如果key.compareTo(e)<0,说明key节点的执行时间小于父节点的执行时间,需要把父节点移到后面
queue[k] = e;
setIndex(e, k);
//设置索引为k
k = parent;
}
//key设置为排序后的位置中
queue[k] = key;
setIndex(key, k);
}
run方法:
public void run() {
//是否周期性,就是判断period是否为0
boolean periodic = isPeriodic();
//检查任务是否可以被执行
if (!canRunInCurrentRunState(periodic))
cancel(false);
//如果非周期性任务直接调用run运行即可
else if (!periodic)
ScheduledFutureTask.super.run();
//如果成功runAndRest,则设置下次运行时间并调用reExecutePeriodic
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
//需要重新将任务(outerTask)放到工作队列中。此方法源码会在后文介绍ScheduledThreadPoolExecutor本身API时提及
reExecutePeriodic(outerTask);
}
}
private void setNextRunTime() {
long p = period;
//fixed‐rate模式,时间设置为上一次时间+p,这里的时间只是可以被执行的最小时间,不代表到点就要执行
if (p > 0)
time += p;
else
//fixed‐delay模式,计算下一次任务可以被执行的时间, 差不多就是当前时间+delay值
time = triggerTime(-p);
}
long triggerTime(long delay) {
//如果delay<Long.Max_VALUE/2,则下次执行时间为当前时间+delay,否则为了避免队列中出现由于溢出导致的排序紊乱,需要调用overflowFree来修正一下delay
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
/**
* 主要就是有这么一种情况:
* 工作队列中维护任务顺序是基于compareTo的,在compareTo中比较两个任务的顺序会用time相减,负数则说明优先级高,那么就有可能出现一个delay为正数,减去另一个为负数的delay,结果上溢为负数,则会导致compareTo产生错误的结果.
* 为了特殊处理这种情况,首先判断一下队首的delay是不是负数,如果是正数不用管了,怎么减都不会溢出。
* 否则可以拿当前delay减去队首的delay来比较看,如果不出现上溢,则整个队列都ok,排序不会乱。
* 不然就把当前delay值给调整为Long.MAX_VALUE+队首delay
/
private long overflowFree(long delay) {
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
long headDelay = head.getDelay(NANOSECONDS);
if (headDelay < 0 && (delay - headDelay < 0))
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}
来源:https://blog.csdn.net/feibendexiaoma/article/details/127168833


猜你喜欢
- 本文实例讲述了C#递归实现回文判断算法,分享给大家供大家参考。具体实现方法如下:static void Main(string[] args
- 本文通过优化买票的重复流程来说明享元模式,为了加深对该模式的理解,会以String和基本数据类型的包装类对该模式的设计进一步说明。读者可以拉
- java调用python的几种用法如下:在java类中直接执行python语句在java类中直接调用本地python脚本使用Runtime.
- 本文实例为大家分享了Silverlight实现星星闪烁动画展示的具体代码,供大家参考,具体内容如下原理很简单,生成1000个圆,从随机数来布
- springboot集成redission及分布式锁的使用1、引入jar包<dependency> &
- 定义JAVA反射机制是在运行状态中,对于任意一个类,都能够知道这个类的所有属性和方法;对于任意一个对象,都能够调用它的任意方法和属性;这种动
- 解析:CLR支持两种类型:值类型和引用类型。用Jeffrey Richter(《CLR via C#》作者)的话来说,“不理解引用类型和值类
- sftp简介sftp是Secure File Transfer Protocol的缩写,安全文件传送协议。可以为传输文件提供一种安全的网络的
- 1. 什么是AOPAOP (Aspect Oriented Programming)意为:面向切面编程,通过预编译方式和运行期 * 实现在
- 本文实例为大家分享了Android实现记事本功能的具体代码,供大家参考,具体内容如下MainActivity.java代码:package
- 前言本文将模块化地介绍如何实现一个动态开辟空间的通讯录,其有以下九个功能:打印主菜单添加联系人删除联系人打印通讯录查找联系人修改联系人置顶联
- import java.util.concurrent.Semaphore;public class ThreeThread {
- 前言本文主要介绍了关于spring boot中servlet启动过程与原理的相关内容,下面话不多说了,来一起看看详细的介绍吧启动过程与原理:
- 1、Java数组介绍在Java中,数组是用来存放同一种数据类型的集合,注意只能存放同一种数据类型(Object类型数组除外)。①、数组的声明
- spring针对Bean之间的循环依赖,有自己的处理方案。关键点就是 * 缓存。当然这种方案不能解决所有的问题,他只能解决Bean单例模式下非
- 导出Excel在很多项目中经常用到,本人介绍了C#实现GridView导出Excel实例代码,也全当给自己留下个学习笔记了。using Sy
- 本文实例讲述了java实现ArrayList根据存储对象排序功能。分享给大家供大家参考,具体如下:与c++中的qsort的实现极为相似,构建
- MyBatis动态sql动态sql处理简单的多参数查询常用标签标签说明if条件判断,与java中的if语句类似where为sql语句动态添加
- 前言在之前的文章我们复习了 ViewGroup 的测量与布局,那么我们这一篇效果就可以在之前的基础上实现一个灵活的九宫格布局。那么一个九宫格
- 实现多文件的上传,基于标准的http来实现。1.多文件上传MyUploader类的实现:/** * * 同步上传多个文件 * 基于标准的h