软件编程
位置:首页>> 软件编程>> java编程>> 详解Java ScheduledThreadPoolExecutor的踩坑与解决方法

详解Java ScheduledThreadPoolExecutor的踩坑与解决方法

作者:JAVA旭阳  发布时间:2022-11-25 17:34:17 

标签:Java,ScheduledThreadPoolExecutor

概述

最近项目上反馈某个重要的定时任务突然不执行了,很头疼,开发环境和测试环境都没有出现过这个问题。定时任务采用的是ScheduledThreadPoolExecutor,后来一看代码发现踩了一个大坑....

还原"大坑"

这个坑就是如果ScheduledThreadPoolExecutor中执行的任务出错抛出异常后,不仅不会打印异常堆栈信息,同时还会取消后面的调度, 直接看例子。

@Test
public void testException() throws InterruptedException {
   // 创建1个线程的调度任务线程池
   ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
   // 创建一个任务
   Runnable runnable = new Runnable() {

volatile int num = 0;

@Override
       public void run() {
           num ++;
           // 模拟执行报错
           if(num > 5) {
               throw new RuntimeException("执行错误");
           }
           log.info("exec num: [{}].....", num);
       }
   };

// 每隔1秒钟执行一次任务
   scheduledExecutorService.scheduleAtFixedRate(runnable, 0, 1, TimeUnit.SECONDS);
   Thread.sleep(10000);
}

运行结果:

详解Java ScheduledThreadPoolExecutor的踩坑与解决方法

  • 只执行了5次后,就不打印,不执行了,因为报错了

  • 任务报错,也没有打印一次堆栈,更导致调度任务取消,后果十分严重。

解决方案

解决方法也非常简单,只要通过try catch捕获异常即可。

详解Java ScheduledThreadPoolExecutor的踩坑与解决方法

运行结果:

详解Java ScheduledThreadPoolExecutor的踩坑与解决方法

看到不仅打印了异常堆栈,而且也会进行周期性的调度。

更推荐的做法

更好的建议可以在自己的项目中封装一个包装类,要求所有的调度都提交通过我们统一的包装类, 如下代码:

@Slf4j
public class RunnableWrapper implements Runnable {
   // 实际要执行的线程任务
   private Runnable task;
   // 线程任务被创建出来的时间
   private long createTime;
   // 线程任务被线程池运行的开始时间
   private long startTime;
   // 线程任务被线程池运行的结束时间
   private long endTime;
   // 线程信息
   private String taskInfo;

private boolean showWaitLog;

/**
    * 执行间隔时间多久,打印日志
    */
   private long durMs = 1000L;

// 当这个任务被创建出来的时候,就会设置他的创建时间
   // 但是接下来有可能这个任务提交到线程池后,会进入线程池的队列排队
   public RunnableWrapper(Runnable task, String taskInfo) {
       this.task = task;
       this.taskInfo = taskInfo;
       this.createTime = System.currentTimeMillis();
   }

public void setShowWaitLog(boolean showWaitLog) {
       this.showWaitLog = showWaitLog;
   }

public void setDurMs(long durMs) {
       this.durMs = durMs;
   }

// 当任务在线程池排队的时候,这个run方法是不会被运行的
   // 但是当任务结束了排队,得到线程池运行机会的时候,这个方法会被调用
   // 此时就可以设置线程任务的开始运行时间
   @Override
   public void run() {
       this.startTime = System.currentTimeMillis();

// 此处可以通过调用监控系统的API,实现监控指标上报
       // 用线程任务的startTime-createTime,其实就是任务排队时间
       // 这边打印日志输出,也可以输出到监控系统中
       if(showWaitLog) {
           log.info("任务信息: [{}], 任务排队时间: [{}]ms", taskInfo, startTime - createTime);
       }

// 接着可以调用包装的实际任务的run方法
       try {
           task.run();
       } catch (Exception e) {
           log.error("run task error", e);
           throw e;
       }

// 任务运行完毕以后,会设置任务运行结束的时间
       this.endTime = System.currentTimeMillis();

// 此处可以通过调用监控系统的API,实现监控指标上报
       // 用线程任务的endTime - startTime,其实就是任务运行时间
       // 这边打印任务执行时间,也可以输出到监控系统中
       if(endTime - startTime > durMs) {
           log.info("任务信息: [{}], 任务执行时间: [{}]ms", taskInfo, endTime - startTime);
       }

}
}

使用:

详解Java ScheduledThreadPoolExecutor的踩坑与解决方法

我们还可以在包装类里面封装各种监控行为,如本例打印日志执行时间等。

原理探究

那大家有没有想过为什么任务出错会导致异常无法打印,甚至调度都取消了呢?让我们从源码出发,一探究竟。

1.下面是调度任务的入口方法。

// ScheduledThreadPoolExecutor#scheduleAtFixedRate
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                             long initialDelay,
                                             long period,
                                             TimeUnit unit) {
   if (command == null || unit == null)
       throw new NullPointerException();
   if (period <= 0)
       throw new IllegalArgumentException();
   // 将执行任务和参数包装成ScheduledFutureTask对象
   ScheduledFutureTask<Void> sft =
       new ScheduledFutureTask<Void>(command,
                                     null,
                                     triggerTime(initialDelay, unit),
                                     unit.toNanos(period));
   RunnableScheduledFuture<Void> t = decorateTask(command, sft);
   sft.outerTask = t;
   // 延迟执行
   delayedExecute(t);
   return t;
}

这个方法主要做了两个事情:

  • 将执行任务和参数包装成ScheduledFutureTask对象

  • 调用delayedExecute方法延迟执行任务

2.延迟或周期性任务的主要执行方法, 主要是将任务丢到队列中,后续由工作线程获取执行。

// ScheduledThreadPoolExecutor#delayedExecute
private void delayedExecute(RunnableScheduledFuture<?> task) {
       if (isShutdown())
           reject(task);
       else {
           // 将任务丢到阻塞队列中
           super.getQueue().add(task);
           if (isShutdown() &&
               !canRunInCurrentRunState(task.isPeriodic()) &&
               remove(task))
               task.cancel(false);
           else
               // 开启工作线程,去执行任务,或者从队列中获取任务执行
               ensurePrestart();
       }
   }

3.现在任务已经在队列中了,我们看下任务执行的内容是什么,还记得前面的包装对象ScheduledFutureTask类,它的实现类是ScheduledFutureTask,继承了Runnable类。

// ScheduledFutureTask#run方法
public void run() {
   // 是不是周期性任务
   boolean periodic = isPeriodic();
   if (!canRunInCurrentRunState(periodic))
       cancel(false);
   // 不是周期性任务的话, 直接调用一次下面的run    
   else if (!periodic)
       ScheduledFutureTask.super.run();
   // 如果是周期性任务,则调用runAndReset方法,如果返回true,继续执行
   else if (ScheduledFutureTask.super.runAndReset()) {
       // 设置下次调度时间
       setNextRunTime();
       // 重新执行调度任务
       reExecutePeriodic(outerTask);
   }
}

这里的关键就是看ScheduledFutureTask.super.runAndReset()方法是否返回true,如果是true的话继续调度。

4.runAndReset方法也很简单,关键就是看报异常如何处理。

// FutureTask#runAndReset
protected boolean runAndReset() {
   if (state != NEW ||
       !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                    null, Thread.currentThread()))
       return false;
   // 是否继续下次调度,默认false
   boolean ran = false;
   int s = state;
   try {
       Callable<V> c = callable;
       if (c != null && s == NEW) {
           try {
               // 执行任务
               c.call();
               // 执行成功的话,设置为true
               ran = true;

// 异常处理,关键点
           } catch (Throwable ex) {
               // 不会修改ran的值,最终是false,同时也不打印异常堆栈
               setException(ex);
           }
       }
   } finally {
       // runner must be non-null until state is settled to
       // prevent concurrent calls to run()
       runner = null;
       // state must be re-read after nulling runner to prevent
       // leaked interrupts
       s = state;
       if (s >= INTERRUPTING)
           handlePossibleCancellationInterrupt(s);
   }
   // 返回结果
   return ran && s == NEW;
}
  • 关键点ran变量,最终返回是不是下次继续调度执行

  • 如果抛出异常的话,可以看到不会修改ran为true。

来源:https://juejin.cn/post/7156452186237435940

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com