软件编程
位置:首页>> 软件编程>> java编程>> SpringBoot线程池和Java线程池的使用和实现原理解析

SpringBoot线程池和Java线程池的使用和实现原理解析

作者:Twilight's  发布时间:2022-06-27 07:22:30 

标签:SpringBoot,线程池,Java

SpringBoot线程池和Java线程池的用法和实现原理

使用默认的线程池

方式一:通过@Async注解调用

public class AsyncTest {
   @Async
   public void async(String name) throws InterruptedException {
       System.out.println("async" + name + " " + Thread.currentThread().getName());
       Thread.sleep(1000);
   }
}

启动类上需要添加@EnableAsync注解,否则不会生效。

@SpringBootApplication
//@EnableAsync
public class Test1Application {
  public static void main(String[] args) throws InterruptedException {
     ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args);
     AsyncTest bean = run.getBean(AsyncTest.class);
     for(int index = 0; index <= 10; ++index){
        bean.async(String.valueOf(index));
     }
  }
}

方式二:直接注入 ThreadPoolTaskExecutor

此时可不加 @EnableAsync注解

@SpringBootTest
class Test1ApplicationTests {

@Resource
  ThreadPoolTaskExecutor threadPoolTaskExecutor;

@Test
  void contextLoads() {
     Runnable runnable = () -> {
        System.out.println(Thread.currentThread().getName());
     };

for(int index = 0; index <= 10; ++index){
        threadPoolTaskExecutor.submit(runnable);
     }
  }

}

线程池默认配置信息

SpringBoot线程池的常见配置:

spring:
 task:
   execution:
     pool:
       core-size: 8
       max-size: 16                          # 默认是 Integer.MAX_VALUE
       keep-alive: 60s                       # 当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过keepAliveTime,线程将被终止
       allow-core-thread-timeout: true       # 是否允许核心线程超时,默认true
       queue-capacity: 100                   # 线程队列的大小,默认Integer.MAX_VALUE
     shutdown:
       await-termination: false              # 线程关闭等待
     thread-name-prefix: task-               # 线程名称的前缀

SpringBoot 线程池的实现原理

TaskExecutionAutoConfiguration 类中定义了 ThreadPoolTaskExecutor,该类的内部实现也是基于java原生的 ThreadPoolExecutor类。initializeExecutor()方法在其父类中被调用,但是在父类中 RejectedExecutionHandler 被定义为了 private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(); ,并通过initialize()方法将AbortPolicy传入initializeExecutor()中。

注意在TaskExecutionAutoConfiguration 类中,ThreadPoolTaskExecutor类的bean的名称为: applicationTaskExecutor 和 taskExecutor

// TaskExecutionAutoConfiguration#applicationTaskExecutor()
@Lazy
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
     AsyncAnnotationBeanPostProcessor.DEFAUL
         T_TASK_EXECUTOR_BEAN_NAME })
@ConditionalOnMissingBean(Executor.class)
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
  return builder.build();
}
// ThreadPoolTaskExecutor#initializeExecutor()
@Override
protected ExecutorService initializeExecutor(
     ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

ThreadPoolExecutor executor;
  if (this.taskDecorator != null) {
     executor = new ThreadPoolExecutor(
           this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
           queue, threadFactory, rejectedExecutionHandler) {
        @Override
        public void execute(Runnable command) {
           Runnable decorated = taskDecorator.decorate(command);
           if (decorated != command) {
              decoratedTaskMap.put(decorated, command);
           }
           super.execute(decorated);
        }
     };
  }
  else {
     executor = new ThreadPoolExecutor(
           this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
           queue, threadFactory, rejectedExecutionHandler);

}

if (this.allowCoreThreadTimeOut) {
     executor.allowCoreThreadTimeOut(true);
  }

this.threadPoolExecutor = executor;
  return executor;
}
// ExecutorConfigurationSupport#initialize()
public void initialize() {
  if (logger.isInfoEnabled()) {
     logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
  }
  if (!this.threadNamePrefixSet && this.beanName != null) {
     setThreadNamePrefix(this.beanName + "-");
  }
  this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
}

覆盖默认的线程池

覆盖默认的 taskExecutor对象,bean的返回类型可以是ThreadPoolTaskExecutor也可以是Executor

@Configuration
public class ThreadPoolConfiguration {

@Bean("taskExecutor")
   public ThreadPoolTaskExecutor taskExecutor() {
       ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
       //设置线程池参数信息
       taskExecutor.setCorePoolSize(10);
       taskExecutor.setMaxPoolSize(50);
       taskExecutor.setQueueCapacity(200);
       taskExecutor.setKeepAliveSeconds(60);
       taskExecutor.setThreadNamePrefix("myExecutor--");
       taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
       taskExecutor.setAwaitTerminationSeconds(60);
       //修改拒绝策略为使用当前线程执行
       taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
       //初始化线程池
       taskExecutor.initialize();
       return taskExecutor;
   }
}

管理多个线程池

如果出现了多个线程池,例如再定义一个线程池 taskExecutor2,则直接执行会报错。此时需要指定bean的名称即可。

@Bean("taskExecutor2")
public ThreadPoolTaskExecutor taskExecutor2() {
   ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
   //设置线程池参数信息
   taskExecutor.setCorePoolSize(10);
   taskExecutor.setMaxPoolSize(50);
   taskExecutor.setQueueCapacity(200);
   taskExecutor.setKeepAliveSeconds(60);
   taskExecutor.setThreadNamePrefix("myExecutor2--");
   taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
   taskExecutor.setAwaitTerminationSeconds(60);
   //修改拒绝策略为使用当前线程执行
   taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
   //初始化线程池
   taskExecutor.initialize();
   return taskExecutor;
}

引用线程池时,需要将变量名更改为bean的名称,这样会按照名称查找。

@Resource
ThreadPoolTaskExecutor taskExecutor2;

对于使用@Async注解的多线程则在注解中指定bean的名字即可。

@Async("taskExecutor2")
   public void async(String name) throws InterruptedException {
       System.out.println("async" + name + " " + Thread.currentThread().getName());
       Thread.sleep(1000);
   }

线程池的四种拒绝策略

JAVA常用的四种线程池

ThreadPoolExecutor 类的构造函数如下:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                         BlockingQueue<Runnable> workQueue) {
   this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
        Executors.defaultThreadFactory(), defaultHandler);
}

newCachedThreadPool

不限制最大线程数(maximumPoolSize=Integer.MAX_VALUE),如果有空闲的线程超过需要,则回收,否则重用已有的线程。

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                             60L, TimeUnit.SECONDS,
                             new SynchronousQueue<Runnable>());

newFixedThreadPool

定长线程池,超出线程数的任务会在队列中等待。

return new ThreadPoolExecutor(nThreads, nThreads,
                             0L, TimeUnit.MILLISECONDS,
                             new LinkedBlockingQueue<Runnable>());

newScheduledThreadPool

类似于newCachedThreadPool,线程数无上限,但是可以指定corePoolSize。可实现延迟执行、周期执行。

public ScheduledThreadPoolExecutor(int corePoolSize) {
   super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
         new DelayedWorkQueue());
}

周期执行:

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.scheduleAtFixedRate(()->{
  System.out.println("rate");
}, 1, 1, TimeUnit.SECONDS);

延时执行:

scheduledThreadPool.schedule(()->{
  System.out.println("delay 3 seconds");
}, 3, TimeUnit.SECONDS);

newSingleThreadExecutor

单线程线程池,可以实现线程的顺序执行。

public static ExecutorService newSingleThreadExecutor() {
   return new FinalizableDelegatedExecutorService
       (new ThreadPoolExecutor(1, 1,
                               0L, TimeUnit.MILLISECONDS,
                               new LinkedBlockingQueue<Runnable>()));
}

Java 线程池中的四种拒绝策略

  • CallerRunsPolicy:线程池让调用者去执行。

  • AbortPolicy:如果线程池拒绝了任务,直接报错。

  • DiscardPolicy:如果线程池拒绝了任务,直接丢弃。

  • DiscardOldestPolicy:如果线程池拒绝了任务,直接将线程池中最旧的,未运行的任务丢弃,将新任务入队。

CallerRunsPolicy

直接在主线程中执行了run方法。

public static class CallerRunsPolicy implements RejectedExecutionHandler {

public CallerRunsPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
       if (!e.isShutdown()) {
           r.run();
       }
   }
}

效果类似于:

Runnable thread = ()->{
  System.out.println(Thread.currentThread().getName());
  try {
     Thread.sleep(0);
  } catch (InterruptedException e) {
     throw new RuntimeException(e);
  }
};

thread.run();

AbortPolicy

直接抛出RejectedExecutionException异常,并指示任务的信息,线程池的信息。、

public static class AbortPolicy implements RejectedExecutionHandler {

public AbortPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
       throw new RejectedExecutionException("Task " + r.toString() +
                                            " rejected from " +
                                            e.toString());
   }
}

DiscardPolicy

什么也不做。

public static class DiscardPolicy implements RejectedExecutionHandler {

public DiscardPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
   }
}

DiscardOldestPolicy

  • e.getQueue().poll() : 取出队列最旧的任务。

  • e.execute(r) : 当前任务入队。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {

public DiscardOldestPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
       if (!e.isShutdown()) {
           e.getQueue().poll();
           e.execute(r);
       }
   }
}

Java 线程复用的原理

java的线程池中保存的是 java.util.concurrent.ThreadPoolExecutor.Worker 对象,该对象在 被维护在private final HashSet<Worker> workers = new HashSet<Worker>();workQueue是保存待执行的任务的队列,线程池中加入新的任务时,会将任务加入到workQueue队列中。

private final class Worker
   extends AbstractQueuedSynchronizer
   implements Runnable
{
   /**
    * This class will never be serialized, but we provide a
    * serialVersionUID to suppress a javac warning.
    */
   private static final long serialVersionUID = 6138294804551838833L;

/** Thread this worker is running in.  Null if factory fails. */
   final Thread thread;
   /** Initial task to run.  Possibly null. */
   Runnable firstTask;
   /** Per-thread task counter */
   volatile long completedTasks;

/**
    * Creates with given first task and thread from ThreadFactory.
    * @param firstTask the first task (null if none)
    */
   Worker(Runnable firstTask) {
       setState(-1); // inhibit interrupts until runWorker
       this.firstTask = firstTask;
       this.thread = getThreadFactory().newThread(this);
   }

/** Delegates main run loop to outer runWorker  */
   public void run() {
       runWorker(this);
   }

// Lock methods
   //
   // The value 0 represents the unlocked state.
   // The value 1 represents the locked state.

protected boolean isHeldExclusively() {
       return getState() != 0;
   }

protected boolean tryAcquire(int unused) {
       if (compareAndSetState(0, 1)) {
           setExclusiveOwnerThread(Thread.currentThread());
           return true;
       }
       return false;
   }

protected boolean tryRelease(int unused) {
       setExclusiveOwnerThread(null);
       setState(0);
       return true;
   }

public void lock()        { acquire(1); }
   public boolean tryLock()  { return tryAcquire(1); }
   public void unlock()      { release(1); }
   public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
       Thread t;
       if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
           try {
               t.interrupt();
           } catch (SecurityException ignore) {
           }
       }
   }
}

work对象的执行依赖于 runWorker(),与我们平时写的线程不同,该线程处在一个循环中,并不断地从队列中获取新的任务执行。因此线程池中的线程才可以复用,而不是像我们平常使用的线程一样执行完毕就结束。

final void runWorker(Worker w) {
   Thread wt = Thread.currentThread();
   Runnable task = w.firstTask;
   w.firstTask = null;
   w.unlock(); // allow interrupts
   boolean completedAbruptly = true;
   try {
       while (task != null || (task = getTask()) != null) {
           w.lock();
           // If pool is stopping, ensure thread is interrupted;
           // if not, ensure thread is not interrupted.  This
           // requires a recheck in second case to deal with
           // shutdownNow race while clearing interrupt
           if ((runStateAtLeast(ctl.get(), STOP) ||
                (Thread.interrupted() &&
                 runStateAtLeast(ctl.get(), STOP))) &&
               !wt.isInterrupted())
               wt.interrupt();
           try {
               beforeExecute(wt, task);
               Throwable thrown = null;
               try {
                   task.run();
               } catch (RuntimeException x) {
                   thrown = x; throw x;
               } catch (Error x) {
                   thrown = x; throw x;
               } catch (Throwable x) {
                   thrown = x; throw new Error(x);
               } finally {
                   afterExecute(task, thrown);
               }
           } finally {
               task = null;
               w.completedTasks++;
               w.unlock();
           }
       }
       completedAbruptly = false;
   } finally {
       processWorkerExit(w, completedAbruptly);
   }
}

来源:https://www.cnblogs.com/twilight0402/p/17305328.html

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com