SpringBoot线程池和Java线程池的使用和实现原理解析
作者:Twilight's 发布时间:2022-06-27 07:22:30
SpringBoot线程池和Java线程池的用法和实现原理
使用默认的线程池
方式一:通过@Async注解调用
public class AsyncTest {
@Async
public void async(String name) throws InterruptedException {
System.out.println("async" + name + " " + Thread.currentThread().getName());
Thread.sleep(1000);
}
}
启动类上需要添加@EnableAsync
注解,否则不会生效。
@SpringBootApplication
//@EnableAsync
public class Test1Application {
public static void main(String[] args) throws InterruptedException {
ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args);
AsyncTest bean = run.getBean(AsyncTest.class);
for(int index = 0; index <= 10; ++index){
bean.async(String.valueOf(index));
}
}
}
方式二:直接注入 ThreadPoolTaskExecutor
此时可不加 @EnableAsync
注解
@SpringBootTest
class Test1ApplicationTests {
@Resource
ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Test
void contextLoads() {
Runnable runnable = () -> {
System.out.println(Thread.currentThread().getName());
};
for(int index = 0; index <= 10; ++index){
threadPoolTaskExecutor.submit(runnable);
}
}
}
线程池默认配置信息
SpringBoot线程池的常见配置:
spring:
task:
execution:
pool:
core-size: 8
max-size: 16 # 默认是 Integer.MAX_VALUE
keep-alive: 60s # 当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过keepAliveTime,线程将被终止
allow-core-thread-timeout: true # 是否允许核心线程超时,默认true
queue-capacity: 100 # 线程队列的大小,默认Integer.MAX_VALUE
shutdown:
await-termination: false # 线程关闭等待
thread-name-prefix: task- # 线程名称的前缀
SpringBoot 线程池的实现原理
TaskExecutionAutoConfiguration
类中定义了 ThreadPoolTaskExecutor
,该类的内部实现也是基于java原生的 ThreadPoolExecutor
类。initializeExecutor()
方法在其父类中被调用,但是在父类中 RejectedExecutionHandler
被定义为了 private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
,并通过initialize()
方法将AbortPolicy
传入initializeExecutor()
中。
注意在TaskExecutionAutoConfiguration
类中,ThreadPoolTaskExecutor
类的bean的名称为: applicationTaskExecutor
和 taskExecutor
。
// TaskExecutionAutoConfiguration#applicationTaskExecutor()
@Lazy
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
AsyncAnnotationBeanPostProcessor.DEFAUL
T_TASK_EXECUTOR_BEAN_NAME })
@ConditionalOnMissingBean(Executor.class)
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
return builder.build();
}
// ThreadPoolTaskExecutor#initializeExecutor()
@Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
ThreadPoolExecutor executor;
if (this.taskDecorator != null) {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler) {
@Override
public void execute(Runnable command) {
Runnable decorated = taskDecorator.decorate(command);
if (decorated != command) {
decoratedTaskMap.put(decorated, command);
}
super.execute(decorated);
}
};
}
else {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);
}
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
this.threadPoolExecutor = executor;
return executor;
}
// ExecutorConfigurationSupport#initialize()
public void initialize() {
if (logger.isInfoEnabled()) {
logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (!this.threadNamePrefixSet && this.beanName != null) {
setThreadNamePrefix(this.beanName + "-");
}
this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
}
覆盖默认的线程池
覆盖默认的 taskExecutor
对象,bean的返回类型可以是ThreadPoolTaskExecutor
也可以是Executor
。
@Configuration
public class ThreadPoolConfiguration {
@Bean("taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//设置线程池参数信息
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(50);
taskExecutor.setQueueCapacity(200);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setThreadNamePrefix("myExecutor--");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(60);
//修改拒绝策略为使用当前线程执行
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化线程池
taskExecutor.initialize();
return taskExecutor;
}
}
管理多个线程池
如果出现了多个线程池,例如再定义一个线程池 taskExecutor2
,则直接执行会报错。此时需要指定bean的名称即可。
@Bean("taskExecutor2")
public ThreadPoolTaskExecutor taskExecutor2() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//设置线程池参数信息
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(50);
taskExecutor.setQueueCapacity(200);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setThreadNamePrefix("myExecutor2--");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(60);
//修改拒绝策略为使用当前线程执行
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化线程池
taskExecutor.initialize();
return taskExecutor;
}
引用线程池时,需要将变量名更改为bean的名称,这样会按照名称查找。
@Resource
ThreadPoolTaskExecutor taskExecutor2;
对于使用@Async
注解的多线程则在注解中指定bean的名字即可。
@Async("taskExecutor2")
public void async(String name) throws InterruptedException {
System.out.println("async" + name + " " + Thread.currentThread().getName());
Thread.sleep(1000);
}
线程池的四种拒绝策略
JAVA常用的四种线程池
ThreadPoolExecutor
类的构造函数如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
newCachedThreadPool
不限制最大线程数(maximumPoolSize=Integer.MAX_VALUE
),如果有空闲的线程超过需要,则回收,否则重用已有的线程。
new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
newFixedThreadPool
定长线程池,超出线程数的任务会在队列中等待。
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
newScheduledThreadPool
类似于newCachedThreadPool
,线程数无上限,但是可以指定corePoolSize
。可实现延迟执行、周期执行。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
周期执行:
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.scheduleAtFixedRate(()->{
System.out.println("rate");
}, 1, 1, TimeUnit.SECONDS);
延时执行:
scheduledThreadPool.schedule(()->{
System.out.println("delay 3 seconds");
}, 3, TimeUnit.SECONDS);
newSingleThreadExecutor
单线程线程池,可以实现线程的顺序执行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
Java 线程池中的四种拒绝策略
CallerRunsPolicy
:线程池让调用者去执行。AbortPolicy
:如果线程池拒绝了任务,直接报错。DiscardPolicy
:如果线程池拒绝了任务,直接丢弃。DiscardOldestPolicy
:如果线程池拒绝了任务,直接将线程池中最旧的,未运行的任务丢弃,将新任务入队。
CallerRunsPolicy
直接在主线程中执行了run方法。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
效果类似于:
Runnable thread = ()->{
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(0);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
thread.run();
AbortPolicy
直接抛出RejectedExecutionException
异常,并指示任务的信息,线程池的信息。、
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
DiscardPolicy
什么也不做。
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
DiscardOldestPolicy
e.getQueue().poll()
: 取出队列最旧的任务。e.execute(r)
: 当前任务入队。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
Java 线程复用的原理
java
的线程池中保存的是 java.util.concurrent.ThreadPoolExecutor.Worker
对象,该对象在 被维护在private final HashSet<Worker> workers = new HashSet<Worker>();
。workQueue
是保存待执行的任务的队列,线程池中加入新的任务时,会将任务加入到workQueue
队列中。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
work对象的执行依赖于 runWorker()
,与我们平时写的线程不同,该线程处在一个循环中,并不断地从队列中获取新的任务执行。因此线程池中的线程才可以复用,而不是像我们平常使用的线程一样执行完毕就结束。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
来源:https://www.cnblogs.com/twilight0402/p/17305328.html
猜你喜欢
- 现在Web开发越来越倾向于前后端分离,前端使用AngularJS,React,Vue等,部署在NodeJS上,后面采用SpringBoot发
- 有时候编译器、处理器的优化会导致runtime与我们设想的不一样,为此Java对编译器和处理器做了一些限制,JAVA内存模型(JMM)将这些
- 服务降级服务压力剧增的时候,根据当前的业务情况及流量对一些服务和页面有策略的降级,以此缓解服务器的压力,以保证核心任务的进行。同时保证部分甚
- 持久层的那些事什么是 JDBCJDBC(JavaDataBase Connectivity)就是 Java 数据库连接, 说的直白点就是 使
- 前言前面介绍了APP顶部导航栏AppBar,今天来介绍下Flutter实现APP底部导航栏。我们以仿写微信的底部导航栏来举例说明。要实现类似
- 在使用JDBC的时候,数据库据连接是非常宝贵的资源。为了复用这些资源,可以将连接保存在一个队列中。当需要的时候可以从队列中取出未使用的连接。
- 代码如下import java.util.concurrent.Callable;import java.util.concurrent.E
- 1、认识XML解析技术1.1、XML相关概念(1)DTD:XML语法规则,是XML文件的验证机制,可以通过比较XML文档和DTD文件看文档是
- java 闰年判断前言:给定一个年份,判断这一年是不是闰年。当以下情况之一满足时,这一年是闰年:1. 年份是4的倍数而不是100的倍数;2.
- 动态内存管理为什么存在动态内存分配我们到现在为止掌握的是什么样的内存开辟方式呢//创建一个变量int val = 20; &n
- 前言本文的记录如何用CustomPaint、GestureDetector实现一个进度条控件。首先需要说明的是 flutter Materi
- 在项目里,我需要做一个Spring Boot结合Thymeleaf前端模版,结合JPA实现分页的演示效果。做的时候发现有些问题,也查了现有网
- 下面给大家介绍下mybatis结果生成键值对的实例代码,具体内容如下所示:在实际应用中我们经常会遇到这样的情况,需要给下拉框赋值,这个时候就
- 最近在配置OpenCV的时候,由于使用的是VS2019,结果找不到Microsoft.Cpp.X64.user这个文件。导致每次新建项目都得
- 这篇文章主要介绍了SpringBoot FreeWorker模板技术解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考
- 在程序中封装了一个List集合对象,然后需要把该集合中的实体插入到数据库中,由于项目使用了Spring+MyBatis的配置,所以打算使用M
- 第一步:下载需要添加的jar包可以在maven库中查找下载,也可以在对应官网下载maven库网址:https://mvnrepository
- Spring中有很多继承于aware中的接口,这些接口到底是做什么用到的。aware,翻译过来是知道的,已感知的,意识到的,所以这些接口从字
- 前言为什么用动静态库我们在实际开发中,经常要使用别人已经实现好的功能,这是为了开发效率和鲁棒性(健壮性);因为那些功能都是顶尖的工程师已经写
- 按行读取文件package test; import java.io.*; import java.util.*; public class