当前位置: 58彩票app下载 > 编程技术 > 正文

出现编制程序

时间:2019-09-23 12:41来源:编程技术
在上一篇线程池的文章《并发编程—— Java 线程池实现原理与源码深度解析》中从ThreadPoolExecutor源码分析了其运行机制。限于篇幅,留下了ScheduledThreadPoolExecutor未做分析,因此本文继续

在上一篇线程池的文章《并发编程—— Java 线程池 实现原理与源码深度解析》中从ThreadPoolExecutor源码分析了其运行机制。限于篇幅,留下了ScheduledThreadPoolExecutor未做分析,因此本文继续从源代码出发分析ScheduledThreadPoolExecutor的内部原理。

前言

在上一篇线程池的文章《Java线程池原理分析ThreadPoolExecutor篇》中从ThreadPoolExecutor源码分析了其运行机制。限于篇幅,留下了ScheduledThreadPoolExecutor未做分析,因此本文继续从源代码出发分析ScheduledThreadPoolExecutor的内部原理。

类声明

1 public class ScheduledThreadPoolExecutor2         extends ThreadPoolExecutor3         implements ScheduledExecutorService {

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,实现了ScheduledExecutorService。因此它具有ThreadPoolExecutor的所有能力。所不同的是它具有定时执行,以周期或间隔循环执行任务等功能。

这里我们先看下ScheduledExecutorService的源码:

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor同ThreadPoolExecutor一样也可以从 Executors线程池工厂创建,所不同的是它具有定时执行,以周期或间隔循环执行任务等功能。

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

}

public interface ScheduledExecutorService extends ExecutorService {
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,因此它具有ThreadPoolExecutor的所有能力。
通过super方法的参数可知,核心线程的数量即传入的参数,而线程池的线程数为Integer.MAX_VALUE,几乎为无上限。
这里采用了DelayedWorkQueue任务队列,也是定时任务的核心,留在后面分析。

ScheduledThreadPoolExecutor实现了ScheduledExecutorService 中的接口:

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

延时执行Callable任务的功能。

 public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

延时执行Runnable任务的功能。

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

可以延时循环执行周期性任务。

假设任务执行时间固定2s,period为1s,因为任务的执行时间大于规定的period,所以任务会每隔2s(任务执行时间)开始执行一次。如果任务执行时间固定为0.5s,period为1s,因为任务执行时间小于period,所以任务会每隔1s(period)开始执行一次。实际任务的执行时间即可能是大于period的,也可能小于period,scheduleAtFixedRate的好处就是每次任务的开始时间间隔必然大于等于period。

假设一项业务需求每天凌晨3点将数据库备份,然而数据库备份的时间小于24H,最适合用scheduleAtFixedRate方法实现。

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

可以延时以相同间隔时间循环执行任务。

假设任务执行的时间固定为2s,delay为1s,那么任务会每隔3s(任务时间+delay)开始执行一次。

如果业务需求本次任务的结束时间与下一个任务的开始时间固定,使用scheduleWithFixedDelay可以方便地实现业务。

ScheduledExecutorService

 1 //可调度的执行者服务接口 2 public interface ScheduledExecutorService extends ExecutorService { 3  4     //指定时延后调度执行任务,只执行一次,没有返回值 5     public ScheduledFuture<?> schedule(Runnable command, 6                                        long delay, TimeUnit unit); 7  8     //指定时延后调度执行任务,只执行一次,有返回值 9     public <V> ScheduledFuture<V> schedule(Callable<V> callable,10                                            long delay, TimeUnit unit);11 12     //指定时延后开始执行任务,以后每隔period的时长再次执行该任务13     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,14                                                   long initialDelay,15                                                   long period,16                                                   TimeUnit unit);17 18     //指定时延后开始执行任务,以后任务执行完成后等待delay时长,再次执行任务19     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,20                                                      long initialDelay,21                                                      long delay,22                                                      TimeUnit unit);23 }

其中schedule方法用于单次调度执行任务。这里主要理解下后面两个方法。

  • scheduleAtFixedRate:该方法在initialDelay时长后第一次执行任务,以后每隔period时长,再次执行任务。注意,period是从任务开始执行算起的。开始执行任务后,定时器每隔period时长检查该任务是否完成,如果完成则再次启动任务,否则等该任务结束后才再次启动任务,看下图示例

图片 1

  • scheduleWithFixDelay:该方法在initialDelay时长后第一次执行任务,以后每当任务执行完成后,等待delay时长,再次执行任务,看下图示例。

图片 2

ScheduledFuture

四个执行任务的方法都返回了ScheduledFuture对象,它与Future有什么区别?

public interface ScheduledFuture<V> extends Delayed, Future<V> {
}

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

public interface Comparable<T> {
    public int compareTo(T o);
}

可以看到ScheduledFuture也继承了Future,并且继承了Delayed,增加了getDelay方法,而Delayed继承自Comparable,所以具有compareTo方法。

使用例子

1、schedule(Runnable command,long delay, TimeUnit unit)

 1 /** 2  * @author: ChenHao 3  * @Date: Created in 14:54 2019/1/11 4  */ 5 public class Test1 { 6     public static void main(String[] args) throws ExecutionException, InterruptedException { 7         // 延迟1s后开始执行,只执行一次,没有返回值 8         ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10); 9         ScheduledFuture<?> result = executorService.schedule(new Runnable() {10             @Override11             public void run() {12                 System.out.println("gh");13                 try {14                     Thread.sleep(3000);15                 } catch (InterruptedException e) {16                     // TODO Auto-generated catch block17                     e.printStackTrace();18                 }19             }20         }, 1000, TimeUnit.MILLISECONDS);21         System.out.println(result.get;22     }23 }

运行结果:

图片 3

2、schedule(Callable<V> callable,long delay, TimeUnit unit);

 1 public class Test2 { 2     public static void main(String[] args) throws ExecutionException, InterruptedException { 3         // 延迟1s后开始执行,只执行一次,有返回值 4         ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10); 5         ScheduledFuture<String> result = executorService.schedule(new Callable<String>() { 6             @Override 7             public String call() throws Exception { 8                 try { 9                     Thread.sleep(3000);10                 } catch (InterruptedException e) {11                     // TODO Auto-generated catch block12                     e.printStackTrace();13                 }14                 return "ghq";15             }16         }, 1000, TimeUnit.MILLISECONDS);17         // 阻塞,直到任务执行完成18         System.out.print(result.get;19     }20 }

运行结果:

图片 4

3、scheduleAtFixedRate

 1 /** 2  * @author: ChenHao 3  * @Date: Created in 14:54 2019/1/11 4  */ 5 public class Test3 { 6     public static void main(String[] args) throws ExecutionException, InterruptedException { 7         ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10); 8         // 从加入任务开始算1s后开始执行任务,1+2s开始执行,1+2*2s执行,1+n*2s开始执行; 9         // 但是如果执行任务时间大于2s则不会并发执行后续任务,当前执行完后,接着执行下次任务。10         ScheduledFuture<?> result = executorService.scheduleAtFixedRate(new Runnable() {11             @Override12             public void run() {13                 System.out.println(System.currentTimeMillis;14             }15         }, 1000, 2000, TimeUnit.MILLISECONDS);16     }17 }

运行结果:

图片 5

4、scheduleWithFixedDelay

 1 /** 2  * @author: ChenHao 3  * @Date: Created in 14:54 2019/1/11 4  */ 5 public class Test4 { 6     public static void main(String[] args) throws ExecutionException, InterruptedException { 7         //任务间以固定时间间隔执行,延迟1s后开始执行任务,任务执行完毕后间隔2s再次执行,任务执行完毕后间隔2s再次执行,依次往复 8         ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10); 9         ScheduledFuture<?> result = executorService.scheduleWithFixedDelay(new Runnable() {10             @Override11             public void run() {12                 System.out.println(System.currentTimeMillis;13             }14         }, 1000, 2000, TimeUnit.MILLISECONDS);15 16         // 由于是定时任务,一直不会返回17         result.get();18         System.out.println("over");19     }20 }

运行结果:

图片 6

四种执行定时任务的方法

源码分析

schedule(Runnable command,long delay, TimeUnit unit)

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

这个方法中出现了几个陌生的类,首先是ScheduledFutureTask:

private class ScheduledFutureTask<V>  extends FutureTask<V> implements RunnableScheduledFuture<V> {
            ...
            ScheduledFutureTask(Runnable r, V result, long ns) {
              super(r, result);
              this.time = ns;
              this.period = 0;
              this.sequenceNumber = sequencer.getAndIncrement();
           }
}

public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
    boolean isPeriodic();
}

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

这个类是ScheduledThreadPoolExecutor的内部类,继承自FutureTask实现了RunnableScheduledFuture接口。RunnableScheduledFuture有些复杂,继承自RunnableFuture和ScheduledFuture接口。可见ScheduledThreadPoolExecutor身兼多职。这个类既可以作为Runnable被线程执行,又可以作为FutureTask用于获取Callable任务call方法返回的结果。

在FutureTask的构造方法中传入Runnable对象会将其转换为返回值为null的Callable对象。

/**
     * Modifies or replaces the task used to execute a runnable.
     * This method can be used to override the concrete
     * class used for managing internal tasks.
     * The default implementation simply returns the given task.
     *
     * @param runnable the submitted Runnable
     * @param task the task created to execute the runnable
     * @param <V> the type of the task's result
     * @return a task that can execute the runnable
     * @since 1.6
     */
    protected <V> RunnableScheduledFuture<V> decorateTask(
        Runnable runnable, RunnableScheduledFuture<V> task) {
        return task;
    }

从decorateTask的字面意义判断它将具体的RunnableScheduledFuture实现类向上转型为RunnableScheduledFuture接口。从它的方法描述和实现看出它只是简单的将ScheduledFutureTask向上转型为RunnableScheduledFuture接口,由protected 修饰符可知设计者希望子类扩展这个方法的实现。

之所以向上转型为RunnableScheduledFuture接口,设计者也是希望将具体与接口分离。

    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            //情况一
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                //情况二
                task.cancel(false);
            else
                //情况三
                ensurePrestart();
        }
    }

    boolean canRunInCurrentRunState(boolean periodic) {
        return isRunningOrShutdown(periodic ?
                                   continueExistingPeriodicTasksAfterShutdown :
                                   executeExistingDelayedTasksAfterShutdown);
    }

    private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

    final boolean isRunningOrShutdown(boolean shutdownOK) {
        int rs = runStateOf(ctl.get());
        return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
    }

delayedExecute方法负责执行延时任务。
情况一 : 先判断线程池是否关闭,若关闭则拒绝任务。
情况二:线程池未关闭,将任务添加到父类的任务队列,即DelayedWorkQueue中。下面再次判断线程池是否关闭,并且判断canRunInCurrentRunState方法的返回值是否为false。因为传入Runnable参数,task.isPeriodic()为false,所以isRunningOrShutdown返回true。所以这里不会执行到。
情况三:任务成功添加到任务队列,执行ensurePrestart方法。

    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

addWorker已经在ThreadPoolExecutor篇分析过,该方法负责同步将线程池数量+1,并且创建Worker对象添加到HashSet中,最后开启Worker对象中的线程。因为RunnableScheduledFuture对象已经被添加到任务队列,Worker中的线程通过getTask方法自然会取到DelayedWorkQueue中的RunnableScheduledFuture任务并执行它的run方法。

这里需要注意的是addWorker方法只在核心线程数未达上限或者没有线程的情况下执行,并不像ThreadPoolExecutor那样可以同时存在多个非核心线程,ScheduledThreadPoolExecutor最多只支持一个非核心线程,除非它终止了不会再创建新的非核心线程。

构造器

1 public ScheduledThreadPoolExecutor(int corePoolSize) {2        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,3              new DelayedWorkQueue;4 }

内部其实都是调用了父类ThreadPoolExecutor的构造器,因此它具有ThreadPoolExecutor的所有能力。

通过super方法的参数可知,核心线程的数量即传入的参数,而线程池的线程数为Integer.MAX_VALUE,几乎为无上限。
这里采用了DelayedWorkQueue任务队列,也是定时任务的核心,是一种优先队列,时间小的排在前面,所以获取任务的时候就能先获取到时间最小的执行,可以看我上篇文章《并发编程—— ScheduledThreadPoolExecutor 实现原理与源码深度解析 之 DelayedWorkQueue》。

由于这里队列没有定义大小,所以队列不会添加满,因此最大的线程数就是核心线程数,超过核心线程数的任务就放在队列里,并不重新开启临时线程。

我们先来看看几个入口方法的实现:

 1 public ScheduledFuture<?> schedule(Runnable command, 2                                    long delay, 3                                    TimeUnit unit) { 4     if (command == null || unit == null) 5         throw new NullPointerException(); 6     RunnableScheduledFuture<?> t = decorateTask(command, 7         new ScheduledFutureTask<Void>(command, null, 8                                       triggerTime(delay, unit))); 9     delayedExecute;10     return t;11 }12 13 public <V> ScheduledFuture<V> schedule(Callable<V> callable,14                                        long delay,15                                        TimeUnit unit) {16     if (callable == null || unit == null)17         throw new NullPointerException();18     RunnableScheduledFuture<V> t = decorateTask(callable,19         new ScheduledFutureTask<V>(callable,20                                    triggerTime(delay, unit)));21     delayedExecute;22     return t;23 }24 25 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,26                                               long initialDelay,27                                               long period,28                                               TimeUnit unit) {29     if (command == null || unit == null)30         throw new NullPointerException();31     if (period <= 0)32         throw new IllegalArgumentException();33     ScheduledFutureTask<Void> sft =34         new ScheduledFutureTask<Void>(command,35                                       null,36                                       triggerTime(initialDelay, unit),37                                       unit.toNanos;38     RunnableScheduledFuture<Void> t = decorateTask(command, sft);39     sft.outerTask = t;40     delayedExecute;41     return t;42 }43 44 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,45                                                  long initialDelay,46                                                  long delay,47                                                  TimeUnit unit) {48     if (command == null || unit == null)49         throw new NullPointerException();50     if (delay <= 0)51         throw new IllegalArgumentException();52     ScheduledFutureTask<Void> sft =53         new ScheduledFutureTask<Void>(command,54                                       null,55                                       triggerTime(initialDelay, unit),56                                       unit.toNanos(-delay));57     RunnableScheduledFuture<Void> t = decorateTask(command, sft);58     sft.outerTask = t;59     delayedExecute;60     return t;61 }

这几个方法都是将任务封装成了ScheduledFutureTask,上面做的首先把runnable装饰为delay队列所需要的格式的元素,然后把元素加入到阻塞队列,然后线程池线程会从阻塞队列获取超时的元素任务进行处理,下面看下队列元素如何实现的。

schedule(Callable<V> callable, long delay, TimeUnit unit)

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

    protected <V> RunnableScheduledFuture<V> decorateTask(
        Callable<V> callable, RunnableScheduledFuture<V> task) {
        return task;
    }

     ScheduledFutureTask(Callable<V> callable, long ns) {
            super(callable);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

与schedule(Runnable command,long delay,TimeUnit unit)相比除了可以通过ScheduledFutureTask的get方法得到返回值外没有区别。

ScheduledFutureTask

ScheduledFutureTask是一个延时定时任务,它可以返回任务剩余延时时间,可以被周期性地执行。

属性

 1 private class ScheduledFutureTask<V> 2         extends FutureTask<V> implements RunnableScheduledFuture<V> { 3         /** 是一个序列,每次创建任务的时候,都会自增。 */ 4         private final long sequenceNumber; 5  6         /** 任务能够开始执行的时间 */ 7         private long time; 8  9         /**10          * 任务周期执行的时间11          * 0表示不是一个周期定时任务12          * 正数表示固定周期时间去执行任务13          * 负数表示任务完成之后,延时period时间再去执行任务14          */15         private final long period;16 17         /** 表示再次执行的任务,在reExecutePeriodic中调用 */18         RunnableScheduledFuture<V> outerTask = this;19 20         /**21          * 表示在任务队列中的索引位置,用来支持快速从队列中删除任务。22          */23         int heapIndex;24 }

ScheduledFutureTask继承了FutureTask 和RunnableScheduledFuture

属性说明:

  1. sequenceNumber: 是一个序列,每次创建任务的时候,都会自增。
  2. time: 任务能够开始执行的时间。
  3. period: 任务周期执行的时间。0表示不是一个周期定时任务。
  4. outerTask: 表示再次执行的任务,在reExecutePeriodic中调用
  5. heapIndex: 表示在任务队列中的索引位置,用来支持快速从队列中删除任务。

构造器

  • 创建延时任务
 1 /** 2  * 创建延时任务 3  */ 4 ScheduledFutureTask(Runnable r, V result, long ns) { 5     // 调用父类的方法 6     super(r, result); 7     // 任务开始的时间 8     this.time = ns; 9     // period是0,不是一个周期定时任务10     this.period = 0;11     // 每次创建任务的时候,sequenceNumber都会自增12     this.sequenceNumber = sequencer.getAndIncrement();13 }14 15  /**16  * 创建延时任务17  */18 ScheduledFutureTask(Callable<V> callable, long ns) {19     // 调用父类的方法20     super;21     // 任务开始的时间22     this.time = ns;23     // period是0,不是一个周期定时任务24     this.period = 0;25     // 每次创建任务的时候,sequenceNumber都会自增26     this.sequenceNumber = sequencer.getAndIncrement();27 }

我们看看super(),其实就是FutureTask 里面的构造方法,关于FutureTask 可以看看我之前的文章《Java 多线程—— 线程池基础 之 FutureTask源码解析》

 1 public FutureTask(Runnable runnable, V result) { 2     this.callable = Executors.callable(runnable, result); 3     this.state = NEW;       // ensure visibility of callable 4 } 5 public FutureTask(Callable<V> callable) { 6     if (callable == null) 7         throw new NullPointerException(); 8     this.callable = callable; 9     this.state = NEW;       // ensure visibility of callable10 }
  • 创建延时定时任务
 1 /** 2  * 创建延时定时任务 3  */ 4 ScheduledFutureTask(Runnable r, V result, long ns, long period) { 5     // 调用父类的方法 6     super(r, result); 7     // 任务开始的时间 8     this.time = ns; 9     // 周期定时时间10     this.period = period;11     // 每次创建任务的时候,sequenceNumber都会自增12     this.sequenceNumber = sequencer.getAndIncrement();13 }

延时定时任务不同的是设置了period,后面通过判断period是否为0来确定是否是定时任务。

run()

 1 public void run() { 2     // 是否是周期任务 3     boolean periodic = isPeriodic(); 4     // 如果不能在当前状态下运行,那么就要取消任务 5     if (!canRunInCurrentRunState) 6         cancel(false); 7     // 如果只是延时任务,那么就调用run方法,运行任务。 8     else if (!periodic) 9         ScheduledFutureTask.super.run();10     // 如果是周期定时任务,调用runAndReset方法,运行任务。11     // 这个方法不会改变任务的状态,所以可以反复执行。12     else if (ScheduledFutureTask.super.runAndReset {13         // 设置周期任务下一次执行的开始时间time14         setNextRunTime();15         // 重新执行任务outerTask16         reExecutePeriodic(outerTask);17     }18 }

这个方法会在ThreadPoolExecutor的runWorker方法中调用,而且这个方法调用,说明肯定已经到了任务的开始时间time了。这个方法我们待会会再继续来回看一下

  1. 先判断当前线程状态能不能运行任务,如果不能,就调用cancel()方法取消本任务。
  2. 如果任务只是一个延时任务,那么调用父类的run()运行任务,改变任务的状态,表示任务已经运行完成了。
  3. 如果任务只是一个周期定时任务,那么就任务必须能够反复执行,那么就不能调用run()方法,它会改变任务的状态。而是调用runAndReset()方法,只是简单地运行任务,而不会改变任务状态。
  4. 设置周期任务下一次执行的开始时间time,并重新执行任务。

scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

与上述两个方法的区别在于ScheduledFutureTask的构造函数多了参数period,即任务执行的最小周期:

        ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
            this.time = ns;
            this.period = period;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

schedule(Runnable command, long delay,TimeUnit unit)

 1 public ScheduledFuture<?> schedule(Runnable command, 2                                   long delay, 3                                   TimeUnit unit) { 4    if (command == null || unit == null) 5        throw new NullPointerException(); 6  7    //装饰任务,主要实现public long getDelay(TimeUnit unit)和int compareTo(Delayed other)方法 8    RunnableScheduledFuture<?> t = decorateTask(command, 9        new ScheduledFutureTask<Void>(command, null,10                                      triggerTime(delay, unit)));11    //添加任务到延迟队列12    delayedExecute;13    return t;14 }

获取延时执行时间

 1 private long triggerTime(long delay, TimeUnit unit) { 2     return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); 3 } 4  5 /** 6  * Returns the trigger time of a delayed action. 7  */ 8 long triggerTime(long delay) { 9     //当前时间加上延时时间10     return now() +11         ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree;12 }

上述的decorateTask方法把Runnable任务包装成ScheduledFutureTask,用户可以根据自己的需要覆写该方法:

1 protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {2     return task;3 }

schedule的核心是其中的delayedExecute方法:

 1 private void delayedExecute(RunnableScheduledFuture<?> task) { 2     if (isShutdown   // 线程池已关闭 3         reject;   // 任务拒绝策略 4     else { 5         //将任务添加到任务队列,会根据任务延时时间进行排序 6         super.getQueue().add; 7         // 如果线程池状态改变了,当前状态不能运行任务,那么就尝试移除任务, 8         // 移除成功,就取消任务。 9         if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic && remove10             task.cancel(false);  // 取消任务11         else12             // 预先启动工作线程,确保线程池中有工作线程。13             ensurePrestart();14     }15 }

这个方法的主要作用就是将任务添加到任务队列中,因为这里任务队列是优先级队列DelayedWorkQueue,它会根据任务的延时时间进行排序。

  • 如果线程池不是RUNNING状态,不能执行延时任务task,那么调用reject方法,拒绝执行任务task。

  • 将任务添加到任务队列中,会根据任务的延时时间进行排序。

  • 因为是多线程并发环境,就必须判断在添加任务的过程中,线程池状态是否被别的线程更改了,那么就可能要取消任务了。

  • 将任务添加到任务队列后,还要确保线程池中有工作线程,不然任务也不为执行。所以ensurePrestart()方法预先启动工作线程,确保线程池中有工作线程。

 1 void ensurePrestart() { 2     // 线程池中的线程数量 3     int wc = workerCountOf); 4     // 如果小于核心池数量,就创建新的工作线程 5     if (wc < corePoolSize) 6         addWorker(null, true); 7     // 说明corePoolSize数量是0,必须创建一个工作线程来执行任务 8     else if (wc == 0) 9         addWorker(null, false);10 }

通过ensurePrestart可以看到,如果核心线程池未满,则新建的工作线程会被放到核心线程池中。如果核心线程池已经满了,ScheduledThreadPoolExecutor不会像ThreadPoolExecutor那样再去创建归属于非核心线程池的工作线程,加入到队列就完了,等待核心线程执行完任务再拉取队列里的任务。也就是说,在ScheduledThreadPoolExecutor中,一旦核心线程池满了,就不会再去创建工作线程。

这里思考一点,什么时候会执行else if 创建一个归属于非核心线程池的工作线程?
答案是,当通过setCorePoolSize方法设置核心线程池大小为0时,这里必须要保证任务能够被执行,所以会创建一个工作线程,放到非核心线程池中。

看到addWorker(null, true); 并没有将任务设置进入,而是设置的null, 则说明线程池里线程第一次启动时,runWorker中取到的firstTask为null,需要通过getTask() 从队列中取任务,这里可以看看我之前写的关于线程池的文章《并发编程—— Java 线程池 实现原理与源码深度解析》。

getTask()中 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();如果是存在核心线程则调用take(),如果传入的核心线程为0,则存在一个临时线程,调用poll(),这两个方法都会先获取时间,看看有没有达到执行时间,没有达到执行时间则阻塞,可以看看我上一篇文章,达到执行时间,则取到任务,就会执行下面的run方法。

 1 public void run() { 2     // 是否是周期任务 3     boolean periodic = isPeriodic(); 4     // 如果不能在当前状态下运行,那么就要取消任务 5     if (!canRunInCurrentRunState) 6         cancel(false); 7     // 如果只是延时任务,那么就调用run方法,运行任务。 8     else if (!periodic) 9         ScheduledFutureTask.super.run();10     // 如果是周期定时任务,调用runAndReset方法,运行任务。11     // 这个方法不会改变任务的状态,所以可以反复执行。12     else if (ScheduledFutureTask.super.runAndReset {13         // 设置周期任务下一次执行的开始时间time14         setNextRunTime();15         // 重新执行任务outerTask16         reExecutePeriodic(outerTask);17     }18 }19 20 public boolean isPeriodic() {21     return period != 0;22 }

schedule不是周期任务,那么调用父类的run()运行任务,改变任务的状态,表示任务已经运行完成了。

scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

与scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)的区别是参数delay传入到ScheduledFutureTask的构造方法中是以负数的形式。

scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

 1 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 2                                              long initialDelay, 3                                              long period, 4                                              TimeUnit unit) { 5    if (command == null || unit == null) 6        throw new NullPointerException(); 7    if (period <= 0) 8        throw new IllegalArgumentException(); 9    //装饰任务类,注意period=period>0,不是负的10    ScheduledFutureTask<Void> sft =11        new ScheduledFutureTask<Void>(command,12                                      null,13                                      triggerTime(initialDelay, unit),14                                      unit.toNanos;15    RunnableScheduledFuture<Void> t = decorateTask(command, sft);16    sft.outerTask = t;17    //添加任务到队列18    delayedExecute;19    return t;20 }

如果是周期任务则执行上面run()方法中的第12行,调用父类中的runAndReset(),这个方法同run方法比较的区别是call方法执行后不设置结果,因为周期型任务会多次执行,所以为了让FutureTask支持这个特性除了发生异常不设置结果。

执行完任务后通过setNextRunTime方法计算下一次启动时间:

 1 private void setNextRunTime() { 2    long p = period; 3   //period=delay; 4    if (p > 0) 5        time += p;//由于period>0所以执行这里,设置time=time+delay 6    else 7        time = triggerTime(-p); 8 } 9 10 long triggerTime(long delay) {11     return now() +12         ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree;13 }

scheduleAtFixedRate会执行到情况一,下一次任务的启动时间最早为上一次任务的启动时间加period。
scheduleWithFixedDelay会执行到情况二,这里很巧妙的将period参数设置为负数到达这段代码块,在此又将负的period转为正数。情况二将下一次任务的启动时间设置为当前时间加period。

然后将任务再次添加到任务队列:

 1 /** 2  * 重新执行任务task 3  */ 4 void reExecutePeriodic(RunnableScheduledFuture<?> task) { 5     // 判断当前线程池状态能不能运行任务 6     if (canRunInCurrentRunState(true)) { 7         // 将任务添加到任务队列,会根据任务延时时间进行排序 8         super.getQueue().add; 9         // 如果线程池状态改变了,当前状态不能运行任务,那么就尝试移除任务,10         // 移除成功,就取消任务。11         if (!canRunInCurrentRunState(true) && remove12             task.cancel(false);13         else14             // 预先启动工作线程,确保线程池中有工作线程。15             ensurePrestart();16     }17 }

这个方法与delayedExecute方法很像,都是将任务添加到任务队列中。

  1. 如果当前线程池状态能够运行任务,那么任务添加到任务队列。
  2. 如果在在添加任务的过程中,线程池状态是否被别的线程更改了,那么就要进行判断,是否需要取消任务。
  3. 调用ensurePrestart()方法,预先启动工作线程,确保线程池中有工作线程。

小结

四种延时启动任务的方法除了构造ScheduledFutureTask的参数不同外,运行机制是相同的。先将任务添加到DelayedWorkQueue 中,然后创建Worker对象,启动内部线程轮询DelayedWorkQueue 中的任务。

那么DelayedWorkQueue的add方法是如何实现的,线程轮询DelayedWorkQueue 调用的poll和take方法又如何实现?

回顾getTask方法获取任务时的代码片段:

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();

如果我们设置ScheduledThreadPoolExecutor的核心线程数量为0,则执行poll方法。而对于核心线程则执行take方法。

下面分析DelayedWorkQueue 的具体实现。

ScheduledFuture的get方法

既然ScheduledFuture的实现是ScheduledFutureTask,而ScheduledFutureTask继承自FutureTask,所以ScheduledFuture的get方法的实现就是FutureTask的get方法的实现,FutureTask的get方法的实现分析在ThreadPoolExecutor篇已经写过,这里不再叙述。要注意的是ScheduledFuture的get方法对于非周期任务才是有效的。

DelayedWorkQueue

static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {
        private static final int INITIAL_CAPACITY = 16;
        private RunnableScheduledFuture<?>[] queue =
            new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
        private final ReentrantLock lock = new ReentrantLock();
        private int size = 0;
}

首先DelayedWorkQueue 是ScheduledThreadPoolExecutor的静态内部类。它的内部有一个RunnableScheduledFuture数组,且初始容量为16.这里提前说明下,queue 数组储存的其实是二叉树结构的索引,这个二叉树其实就是最小堆。

推荐博客

  

add方法

        public boolean add(Runnable e) {
            return offer(e);
        }

        public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                    grow();
                size = i + 1;
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    siftUp(i, e);
                }
                if (queue[0] == e) {
                    leader = null;
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

        private void grow() {
            int oldCapacity = queue.length;
            int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
            if (newCapacity < 0) // overflow
                newCapacity = Integer.MAX_VALUE;
            queue = Arrays.copyOf(queue, newCapacity);
        }

        private void setIndex(RunnableScheduledFuture<?> f, int idx) {
            if (f instanceof ScheduledFutureTask)
                ((ScheduledFutureTask)f).heapIndex = idx;
        }

        private void siftUp(int k, RunnableScheduledFuture<?> key) {
            while (k > 0) {
                int parent = (k - 1) >>> 1;
                RunnableScheduledFuture<?> e = queue[parent];
                if (key.compareTo(e) >= 0)
                    break;
                queue[k] = e;
                setIndex(e, k);
                k = parent;
            }
            queue[k] = key;
            setIndex(key, k);
        }

在执行add方法时内部执行的是offer方法,添加RunnableScheduledFuture任务到队列时先通过内部的ReentrantLock加锁,因此在多线程调用schedule(Runnable command,long delay, TimeUnit unit)添加任务时也能保证同步。

接下来先判断队列是否已满,若已满就先通过grow方法扩容。扩容算法是将现有容量*1.5,然后将旧的数组复制到新的数组。(左移一位等于除以2)。

然后判断插入的是否为第一个任务,如果是就将RunnableScheduledFuture向下转型为ScheduledFutureTask,并将其heapIndex 属性设置为0.

如果不是第一个任务,则执行siftUp方法。该方法先找到父亲RunnableScheduledFuture对象节点,将要插入的RunnableScheduledFuture节点与之compareTo比较,若父亲RunnableScheduledFuture对象的启动时间小于当前要插入的节点的启动时间,则将节点插入到末尾。反之会对二叉树以启动时间升序重新排序RunnableScheduledFuture接口的实现其实是ScheduledFutureTask类:

    new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit);

    private long triggerTime(long delay, TimeUnit unit) {
        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }

    /**
     * Returns the trigger time of a delayed action.
     */
    long triggerTime(long delay) {
        return now() +
            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }

    final long now() {
        return System.nanoTime();
    }

第三个参数triggerTime方法返回的就是任务延时的时间加上当前时间。

在ScheduledFutureTask内部实现了compareTo方法:

public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }

比较的两个任务的启动时间。所以DelayedWorkQueue内部的二叉树是以启动时间早晚排序的。

ScheduledThreadPoolExecutor总结

  • ScheduledThreadPoolExecutor是实现自ThreadPoolExecutor的线程池,构造方法中传入参数n,则最多会有n个核心线程工作,空闲的核心线程不会被自动终止,而是一直阻塞在DelayedWorkQueue的take方法尝试获取任务。构造方法传入的参数为0,ScheduledThreadPoolExecutor将以非核心线程工作,并且最多只会创建一个非核心线程,参考上文中ensurePrestart方法的执行过程。而这个非核心线程以poll方法获取定时任务之所以不会因为超时就被回收,是因为任务队列并不为空,只有在任务队列为空时才会将空闲线程回收,详见ThreadPoolExecutor篇的runWorker方法,之前我以为空闲的非核心线程超时就会被回收是不正确的,还要具备任务队列为空这个条件。

  • ScheduledThreadPoolExecutor的定时执行任务依赖于DelayedWorkQueue,其内部用可扩容的数组实现以启动时间升序的二叉树。

  • 工作线程尝试获取DelayedWorkQueue的任务只有在任务到达指定时间才会成功,否则非核心线程会超时返回null,核心线程一直阻塞。

  • 对于非周期型任务只会执行一次并且可以通过ScheduledFuture的get方法阻塞得到结果,其内部实现依赖于FutureTask的get方法。

  • 周期型任务通过get方法无法获取有效结果,因为FutureTask对于周期型任务执行的是runAndReset方法,并不会设置结果。周期型任务执行完毕后会重新计算下一次启动时间并且再次添加到DelayedWorkQueue中。

poll方法

        public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null) {
                        //情况一 空队列
                        if (nanos <= 0)
                            return null;
                        else
                            nanos = available.awaitNanos(nanos);
                    } else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            //情况二 已到启动时间  
                            return finishPoll(first);
                        if (nanos <= 0)
                            //情况三 未到启动时间,但是线程等待超时
                            return null;
                        first = null; // don't retain ref while waiting
                        if (nanos < delay || leader != null)
                            nanos = available.awaitNanos(nanos);
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                long timeLeft = available.awaitNanos(delay);
                                nanos -= delay - timeLeft;
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

非核心线程会通过poll方法同步获取任务队列中的RunnableScheduledFuture,如果队列为空或者在timeout内还等不到任务的启动时间,都将返回null。如果任务队列不为空,并且首个任务已到启动时间线程就能够获取RunnableScheduledFuture任务并执行run方法。

take方法

        public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

与非核心线程执行的poll方法相比,核心线程执行的take方法并不会超时,在获取到首个将要启动的任务前,核心线程会一直阻塞。

finishPoll方法

        private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
            int s = --size;
            RunnableScheduledFuture<?> x = queue[s];
            queue[s] = null;
            if (s != 0)
                siftDown(0, x);
            setIndex(f, -1);
            return f;
        }

在成功获取任务后,DelayedWorkQueue的finishPoll方法会将任务移除队列,并以启动时间升序重排二叉树。

小结

DelayedWorkQueue内部维持了一个以任务启动时间升序排序的二叉树数组,启动时间最靠前的任务即数组的首个位置上的任务。核心线程通过take方法一直阻塞直到获取首个要启动的任务。非核心线程通过poll方法会在timeout时间内阻塞尝试获取首个要启动的任务,如果超过timeout未得到任务不会继续阻塞。

这里要特别说明要启动的任务指的是RunnableScheduledFuture内部的time减去当前时间小于等于0,未满足条件的任务不会被take或poll方法返回,这也就保证了未到指定时间任务不会执行。

执行ScheduledFutureTask

前面已经分析了schedule方法如何将RunnableScheduledFuture插入到DelayedWorkQueue,Worker内的线程如何获取定时任务。下面分析任务的执行过程,即ScheduledFutureTask的run方法:

        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }

如果执行的是非周期型任务,调用ScheduledFutureTask.super.run()方法,即ScheduledFutureTask的父类FutureTask的run方法。FutureTask的run方法已经在ThreadPoolExecutor篇分析过,这里不再多说。

如果执行的是周期型任务,则执行ScheduledFutureTask.super.runAndReset():

    protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }

这个方法同run方法比较的区别是call方法执行后不设置结果,因为周期型任务会多次执行,所以为了让FutureTask支持这个特性除了发生异常不设置结果。

执行完任务后通过setNextRunTime方法计算下一次启动时间:

        private void setNextRunTime() {
            long p = period;
            if (p > 0)
                //情况一
                time += p;
            else
                //情况二
                time = triggerTime(-p);
        }

        long triggerTime(long delay) {
            return now() +
                ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
        }

还记得ScheduledThreadPoolExecutor执行定时任务的后两种scheduleAtFixedRate和scheduleWithFixedDelay。
scheduleAtFixedRate会执行到情况一,下一次任务的启动时间最早为上一次任务的启动时间加period。
scheduleWithFixedDelay会执行到情况二,这里很巧妙的将period参数设置为负数到达这段代码块,在此又将负的period转为正数。情况二将下一次任务的启动时间设置为当前时间加period。

然后将任务再次添加到任务队列:

    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

ScheduledFuture的get方法

既然ScheduledFuture的实现是ScheduledFutureTask,而ScheduledFutureTask继承自FutureTask,所以ScheduledFuture的get方法的实现就是FutureTask的get方法的实现,FutureTask的get方法的实现分析在ThreadPoolExecutor篇已经写过,这里不再叙述。要注意的是ScheduledFuture的get方法对于非周期任务才是有效的。

ScheduledThreadPoolExecutor总结

  • ScheduledThreadPoolExecutor是实现自ThreadPoolExecutor的线程池,构造方法中传入参数n,则最多会有n个核心线程工作,空闲的核心线程不会被自动终止,而是一直阻塞在DelayedWorkQueue的take方法尝试获取任务。构造方法传入的参数为0,ScheduledThreadPoolExecutor将以非核心线程工作,并且最多只会创建一个非核心线程,参考上文中ensurePrestart方法的执行过程。而这个非核心线程以poll方法获取定时任务之所以不会因为超时就被回收,是因为任务队列并不为空,只有在任务队列为空时才会将空闲线程回收,详见ThreadPoolExecutor篇的runWorker方法,之前我以为空闲的非核心线程超时就会被回收是不正确的,还要具备任务队列为空这个条件。
  • ScheduledThreadPoolExecutor的定时执行任务依赖于DelayedWorkQueue,其内部用可扩容的数组实现以启动时间升序的二叉树。
  • 工作线程尝试获取DelayedWorkQueue的任务只有在任务到达指定时间才会成功,否则非核心线程会超时返回null,核心线程一直阻塞。
  • 对于非周期型任务只会执行一次并且可以通过ScheduledFuture的get方法阻塞得到结果,其内部实现依赖于FutureTask的get方法。
  • 周期型任务通过get方法无法获取有效结果,因为FutureTask对于周期型任务执行的是runAndReset方法,并不会设置结果。周期型任务执行完毕后会重新计算下一次启动时间并且再次添加到DelayedWorkQueue中。

在源代码的分析过程中发现分析DelayedWorkQueue还需要有二叉树的升序插入算法的知识,一开始也没有认出来这种数据结构,后来又看了别人的文章才了解。这里比较难理解,有兴趣的同学可以参考《深度解析Java8 – ScheduledThreadPoolExecutor源码解析》。

编辑:编程技术 本文来源:出现编制程序

关键词: