软件编程
位置:首页>> 软件编程>> java编程>> java ThreadPoolExecutor线程池拒绝策略避坑

java ThreadPoolExecutor线程池拒绝策略避坑

作者:用户9973317139738  发布时间:2021-09-05 08:39:52 

标签:java,ThreadPoolExecutor,线程池,拒绝策略

1.场景

线程池使用DiscardOldestPolicy拒绝策略,阻塞队列使用ArrayBlockingQueue,发现在某些情形下对于得到的Future,调用get()方法当前线程会一直阻塞。

为了便于理解,将实际情景抽象为下面的代码:

ThreadPoolExecutor threadPoolExecutor  = new ThreadPoolExecutor(
       1,
       1,
       1,
       TimeUnit.SECONDS,
       new ArrayBlockingQueue<>(1),
       Executors.defaultThreadFactory(),
       new ThreadPoolExecutor.DiscardOldestPolicy());//新建线程池时核心线程数及最大线程数都设置为1,阻塞队列使用ArrayBlockingQueue,拒绝策略为DiscardOldestPolicy
public void doBusiness(){
   Task task1 = new Task();
   Task task2 = new Task();
   Task task3 = new Task();
   Future<Boolean> future1 = threadPoolExecutor.submit(task1);//当前工作线程为0,会新建一个worker作为工作线程,并执行task1
   Future<Boolean> future2 = threadPoolExecutor.submit(task2);//当前核心线程数已满,会将任务放入阻塞队列
   Future<Boolean> future3 = threadPoolExecutor.submit(task3);
   /*当前核心线程已满并且阻塞队列已满,execute()时会调用ThreadPoolExecutord的addWorker(command,false),由
   于目前task1还没执行完,则工作线程数量为1,已经达到了最大线程数,则addWorker(command,false)返回false,
   触发对应的拒绝策略,会从阻塞队列中移除task2对应的任务(阻塞队列中并不是直接放的task2,而是以task2为入
   参构造的一个FutureTask,参见AbstarctExecutorService的submit(Callable<T> task)方法*/
   try{
       boolean result = future2.get();
       System.out.println(result);
   } catch (ExecutionException e) {
       e.printStackTrace();
   } catch (InterruptedException e) {
       e.printStackTrace();
   }
}
@Test
public void test_doBusiness(){
   doBusiness();//入口
}
private class Task implements Callable<Boolean>{
   @Override
   public Boolean call() throws Exception {
       try {
           Thread.sleep(1000);//模拟业务执行
           return true;
       }catch(Exception e){
           e.printStackTrace();
       }
       return true;
   }
}

2. 原因分析

通过上面代码我们明白了阻塞队列会将task2对应的任务移除,那么为何移除之后调用get()方法线程会一直阻塞呢?

其实Future future2= threadPoolExecutor.submit(task2)实际会调用AbstractExecutorService的submit(Callable task)方法,并且最终返回的future2实际是一个FutureTask类型。

public <T> Future<T> submit(Callable<T> task) {
   if (task == null) throw new NullPointerException();
   RunnableFuture<T> ftask = newTaskFor(task);
   execute(ftask);
   return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
   return new FutureTask<T>(callable);
}

因此,我们直接看FutureTask的get()方法

public V get() throws InterruptedException, ExecutionException {
   int s = state;
   if (s &lt;= COMPLETING)
       s = awaitDone(false, 0L);
   return report(s);
}

由于future2已经从阻塞队列中移除,并且从始至终都没有工作线程执行它,即FutureTask的状态一直都为NEW状态,其会进入awaitDone(false,0L)中,接下列我们追踪该方法。

private int awaitDone(boolean timed, long nanos)
   throws InterruptedException {
   final long deadline = timed ? System.nanoTime() + nanos : 0L;
   WaitNode q = null;
   boolean queued = false;
   for (;;) {
       if (Thread.interrupted()) {
           removeWaiter(q);
           throw new InterruptedException();
       }
       int s = state;
       if (s > COMPLETING) {
           if (q != null)
               q.thread = null;
           return s;
       }
       else if (s == COMPLETING) // cannot time out yet
           Thread.yield();
       else if (q == null)//第一次进for循环时q==null,进入到该分支
           q = new WaitNode();
       else if (!queued)//第二次进for循环时queue为false,则使用CAS将q置为waiters的头结点
           queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                q.next = waiters, q);
       else if (timed) {
           nanos = deadline - System.nanoTime();
           if (nanos <= 0L) {
               removeWaiter(q);
               return state;
           }
           LockSupport.parkNanos(this, nanos);
       }
       else//将q置为头结点后,最终会进入这里调用park()方法,阻塞当前线程
           LockSupport.park(this);
   }

从上面的代码可以看出调用future2.get()后会一直阻塞在park()方法处,这便是本次问题出现的原因,

3.总结

本次问题出现主要是同时满足了以下几点:

1)使用了有界的阻塞队列ArrayBlockingQueue

2)工作线程达到了线程池配置的最大线程数

3)拒绝策略使用了DiscardOldestPolicy(使用DiscardPolicy也会出现这个问题)

4.思考

我们日常使用线程池提交任务后,如果在任务执行完成之前调用future的get()方法,当前线程会进入阻塞状态,当任务执行完成后,才会将当前线程唤醒,如何从代码上分析该流程?

首先当任务提交到线程池,如果任务当前在阻塞队列中,则FutureTask的状态依然像上面的情况一样,是处于New状态,调用get()方法依然会到达LockSupport.park(this)处,将当前线程阻塞。什么时候才会将当前线程唤醒了?

那就是当存在工作线程Worker目前分配的任务执行完成后,其会去调用Worker类的getTask()方法从阻塞队列中拿到该任务,并执行该任务的run()方法,下面是FutureTask的run()方法

public void run() {
   if (state != NEW ||
       !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                    null, Thread.currentThread()))
       return;
   try {
       Callable<V> c = callable;
       if (c != null && state == NEW) {
           V result;
           boolean ran;
           try {
               result = c.call();
               ran = true;
           } catch (Throwable ex) {
               result = null;
               ran = false;
               setException(ex);
           }
           if (ran)
               set(result);//如果任务执行成功,则调用set(V result)方法
       }
   } finally {
       // runner must be non-null until state is settled to
       // prevent concurrent calls to run()
       runner = null;
       // state must be re-read after nulling runner to prevent
       // leaked interrupts
       int s = state;
       if (s >= INTERRUPTING)
           handlePossibleCancellationInterrupt(s);
   }
}

其会在执行成功后,调用set(V result)方法

protected void set(V v) {
   if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
       outcome = v;
       UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
       finishCompletion();//
   }
}

然后将FutureTask状态置为NORMAL(FutureTask的状态要和ThreadPoolExecutor的状态区分开),接着调用finishCompletion()方法

private void finishCompletion() {
   // assert state > COMPLETING;
   for (WaitNode q; (q = waiters) != null;) {
       if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
           for (;;) {
               Thread t = q.thread;//q在await()方法中设置的,其值为调用get()方法的线程
               if (t != null) {
                   q.thread = null;
                   LockSupport.unpark(t);//唤醒该线程
               }
               WaitNode next = q.next;
               if (next == null)
                   break;
               q.next = null; // unlink to help gc
               q = next;
           }
           break;
       }
   }
   done();//熟悉的钩子方法
   callable = null;        // to reduce footprint
}

在finishCompletion中唤起因get()而阻塞的线程。

来源:https://juejin.cn/post/7113201013389000734

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com