澳门搏彩官方网 > Web前端 >

澳门苹果官网线程池之ScheduledThreadPoolExecutor调整原理

ScheduledThreadPoolExecutor 的调整原理主要依赖四个里头类,ScheduledFutureTask 和 DelayedWorkQueue:

博客链接:http://www.ideabuffer.cn/2017/04/14/深深掌握Java线程池:ScheduledThreadPoolExecutor/

  1. ScheduledFutureTask 是对职分的生龙活虎层封装,将大家提交的 Runnable 或 Callable 封装成具一时间周期的职责;
  2. DelayedWorkQueue 达成了对 ScheduledFutureTask 的推迟出队管理;

澳门电子游戏网址十大 1ScheduledFutureTask类图

介绍

自JDK1.5方始,JDK提供了ScheduledThreadPoolExecutor类来支撑周期性职分的调治。在这里在此之前的贯彻必要依附Timer和TimerTask恐怕其余第三方工具来产生。但Timer有过多的弱项:

  • Timer是单线程形式;
  • 若果在施行职责时期有个别TimerTask耗费时间较久,那么就能耳熏目染别的职务的调节;
  • Timer的职分调整是依附相对时间的,对系统时间灵活;
  • Timer不会捕获实施TimerTask时所抛出的百般,由于Timer是单线程,所以借使现身至极,则线程就能够停下,别的职分也得不到施行。

ScheduledThreadPoolExecutor世袭ThreadPoolExecutor来重用线程池的法力,它的落到实处况势如下:

  • 将职责封装成ScheduledFutureTask对象,ScheduledFutureTask基于相对时间,不受系统时间的退换所影响;
  • ScheduledFutureTask实现了java.lang.Comparable接口和java.util.concurrent.Delayed接口,所以有多少个基本点的法子:compareTo和getDelay。compareTo方法用于相比任务之间的开始的一段时期级关系,假诺离开下一次实行的光阴间距不够长,则优先级高;getDelay方法用于重临间隔下一次职责奉行时间的年华间隔;
  • ScheduledThreadPoolExecutor定义了三个DelayedWorkQueue,它是叁个稳步队列,会通过各类任务依据间距后一次试行时间间距的朗朗上口来排序;
  • ScheduledFutureTask世襲自FutureTask,能够经过重临Future对象来博取试行的结果。

经过如上的牵线,可以相比较一下Timer和ScheduledThreadPoolExecutor:

Timer ScheduledThreadPoolExecutor
单线程 多线程
单个任务执行时间影响其他任务调度 多线程,不会影响
基于绝对时间 基于相对时间
一旦执行任务出现异常不会捕获,其他任务得不到执行 多线程,单个任务的执行不会影响其他线程

故此,在JDK1.5随后,应该没什么理由三番五次选择Timer举办任务调治了。

ScheduledFutureTask有以下三种布局方法:

ScheduledThreadPoolExecutor的使用

上面用四个实际的事例来阐明ScheduledThreadPoolExecutor的应用:

public class ScheduledThreadPoolTest {

    public static void main(String[] args) throws InterruptedException {
        // 创建大小为5的线程池
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);

        for (int i = 0; i < 3; i++) {
            Task worker = new Task("task-" + i);
            // 只执行一次
//          scheduledThreadPool.schedule(worker, 5, TimeUnit.SECONDS);
            // 周期性执行,每5秒执行一次
            scheduledThreadPool.scheduleAtFixedRate(worker, 0,5, TimeUnit.SECONDS);
        }

        Thread.sleep(10000);

        System.out.println("Shutting down executor...");
        // 关闭线程池
        scheduledThreadPool.shutdown();
        boolean isDone;
        // 等待线程池终止
        do {
            isDone = scheduledThreadPool.awaitTermination(1, TimeUnit.DAYS);
            System.out.println("awaitTermination...");
        } while(!isDone);

        System.out.println("Finished all threads");
    }


}


class Task implements Runnable {

    private String name;

    public Task(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        System.out.println("name = " + name + ", startTime = " + new Date());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("name = " + name + ", endTime = " + new Date());
    }

}

上面就来具体分析一下ScheduledThreadPoolExecutor的完毕进度。

ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement();}ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement();}ScheduledFutureTask(Callable<V> callable, long ns) { super; this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement();}

ScheduledThreadPoolExecutor的实现

super 中调用 FutureTask 的布局方法,能够仿照效法FutureTask完成原理。ScheduledFutureTask 首要配置参数如下:

ScheduledThreadPoolExecutor的类协会

看下ScheduledThreadPoolExecutor内部的类图:

决不被如此多类吓到,这里只然而是为着更通晓的刺探ScheduledThreadPoolExecutor有关调治和队列的接口。

ScheduledThreadPoolExecutor世袭自ThreadPoolExecutor,完成了ScheduledExecutorService接口,该接口定义了schedule等任务调节的法子。

与此同有时间ScheduledThreadPoolExecutor有七个至关心爱戴要的当中类:DelayedWorkQueue和ScheduledFutureTask。能够见见,DelayeddWorkQueue是三个拥塞队列,而ScheduledFutureTask继承自FutureTask,并且实现了Delayed接口。有关FutureTask的介绍请参见另风流倜傥篇文章:FutureTask源码深入分析。

名称 含义
time 任务能够执行的时间点(单位:nanoTime )
period 正值表示固定时间周期执行。 负值表示固定延迟周期执行。 0表示非重复任务。
sequenceNumber FIFO调度序列值(用 AtomicLong 实现)

ScheduledThreadPoolExecutor的布局方法

ScheduledThreadPoolExecutor有3中结构方法:

public ScheduledThreadPoolExecutor(int corePoolSize,
                                    ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

因为ScheduledThreadPoolExecutor世襲自ThreadPoolExecutor,所以那边都以调用的ThreadPoolExecutor类的布局方法。有关ThreadPoolExecutor能够参照他事他说加以调查深远精通Java线程池:ThreadPoolExecutor。

此处注意传入的不通队列是DelayedWorkQueue类型的靶子。后边会详细介绍。

静心:period 大于 0 或 小于 0 时,都以周期性推行的,只是实行时间规律不周边。

schedule方法

在上文的事例中,使用了schedule方法来进展职责调整,schedule方法的代码如下:

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}


public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay,
                                       TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<V> t = decorateTask(callable,
        new ScheduledFutureTask<V>(callable,
                                   triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

第少年老成,这里的多个重载的schedule方法只是流传的率先个参数分裂,能够是Runnable对象或许Callable对象。会把传播的职务封装成三个RunnableScheduledFuture对象,其实也正是ScheduledFutureTask对象,decorateTask暗许什么意义都未曾做,子类能够重写该方法:

/**
 * 修改或替换用于执行 runnable 的任务。此方法可重写用于管理内部任务的具体类。默认实现只返回给定任务。
 */
protected <V> RunnableScheduledFuture<V> decorateTask(
    Runnable runnable, RunnableScheduledFuture<V> task) {
    return task;
}

/**
 * 修改或替换用于执行 callable 的任务。此方法可重写用于管理内部任务的具体类。默认实现只返回给定任务。
 */
protected <V> RunnableScheduledFuture<V> decorateTask(
    Callable<V> callable, RunnableScheduledFuture<V> task) {
    return task;
}

接下来,通过调用delayedExecute方法来延时推行职责。
末尾,重临叁个ScheduledFuture对象。

ScheduledFutureTask 的主要调度帮忙方法如下:

scheduleAtFixedRate方法

该措施设置了推行周期,下一遍实践时间一定于是上叁遍的实施时间累计period,它是选用已定位的频率来实行职务:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}
// 任务的延迟执行时间public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS);}//实现任务的排序,执行时间越小越靠前,相同则按照队列FIFO顺序public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) // 时间一样时,按照FIFO的顺序 return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}// 是否是周期性任务public boolean isPeriodic() { return period != 0;}// 设置下一次运行时间private void setNextRunTime() { long p = period; if  time += p; // 按固定时间周期,下次执行时间为上次执行时间 + 周期时间 else time = triggerTime; // 按固定延时周期,下次执行时间为当前时间 + 延时时间}

scheduleWithFixedDelay方法

该形式设置了进行周期,与scheduleAtFixedRate方法分歧的是,下叁次施行时间是上贰次任务试行完的种类时间累计period,由此具体进行时间不是平素的,但周期是原则性的,是应用相对固定的延期来执行职务:

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

留意这里的unit.toNanos(-delay));,这里把周期设置为负数来代表是周旋固化的延期实施。

scheduleAtFixedRate和scheduleWithFixedDelay的分歧在setNextRunTime方法中就能够看出来:

private void setNextRunTime() {
    long p = period;
    // 固定频率,上次执行时间加上周期时间
    if (p > 0)
        time += p;
    // 相对固定延迟执行,使用当前系统时间加上周期时间
    else
        time = triggerTime(-p);
}

setNextRunTime方法会在run方法中实施完职务后调用。

澳门苹果官网线程池之ScheduledThreadPoolExecutor调整原理。核心 run 方法

triggerTime方法

trigger提姆e方法用于获取下二遍执行的现实时刻:

private long triggerTime(long delay, TimeUnit unit) {
    return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}


long triggerTime(long delay) {
    return now() +
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

这里的delay < (Long.MAX_VALUE >> 1是为了认清是不是要有备无患Long类型溢出,如若delay的值紧跟于Long类型最大值的一半,则从来回到delay,不然必要展开防备溢出管理。

public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState) // 判断是否可以运行任务 cancel; // 取消任务,移除队列 else if (!periodic) // 非周期性任务 直接调用父类 FutureTask 的 run 方法 ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset { // 周期性任务,调用父类 runAndReset 方法,返回是否执行成功 // 执行成功后继续设置下一次运行时间 setNextRunTime(); // 重新执行周期性任务(可能因为线程池运行状态的改变而被拒绝) reExecutePeriodic(outerTask); }}

overflowFree方法

该方法的效果与利益是限量队列中负有节点的延迟时间在Long.MAX_VALUE之内,防止在compareTo方法中溢出。

private long overflowFree(long delay) {
    // 获取队列中的第一个节点
    Delayed head = (Delayed) super.getQueue().peek();
    if (head != null) {
        // 获取延迟时间
        long headDelay = head.getDelay(NANOSECONDS);
        // 如果延迟时间小于0,并且 delay - headDelay 超过了Long.MAX_VALUE
        // 将delay设置为 Long.MAX_VALUE + headDelay 保证delay小于Long.MAX_VALUE
        if (headDelay < 0 && (delay - headDelay < 0))
            delay = Long.MAX_VALUE + headDelay;
    }
    return delay;
}

当叁个任务已经足以进行出队操作,但还并未有施行,或许出于线程池中的工作线程不是悠闲的。具体剖析一下这种景色:

  • 为了便于表达,即使Long.MAX_VALUE=1023,也正是10位,并且当前的年华是100,调用triggerTime时并不曾对delay进行推断,而是一直再次回到了now() + delay,也正是生龙活虎对生龙活虎于100 + 1023,那早晚是溢出了,那么重返的光阴是-925;
  • 风流倜傥经头节点已经能够出队可是还从未推行出队,那么头节点的实践时间应该是低于当前光阴的,若是是95;
  • 那个时候调用offer方法向队列中加上任务,在offer方法中会调用siftUp方法来排序,在siftUp方法实践时又会调用ScheduledFutureTask中的compareTo方法来相比较实行时间;
  • 那时假诺施行到了compareTo方法中的long diff = time - x.time;时,那么合算后的结果便是-925 - 95 = -1020,那么将回到-1,而符合规律情形应当是回来1,因为新插手的职分的推行时间要比头结点的实行时间要晚,那就不是大家想要的结果了,那会诱致队列中的顺序不精确。
  • 同理也能够算一下在奉行compareTo方法中的long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);时也是有这种情景;
  • 就此在triggerTime方法中对delay的大大小小做了判别,正是为着防范这种意况时有产生。

借使执行了overflowFree方法吗,那时候headDelay = 95 - 100 = -5,然后实践delay = 1023 + (-5) = 1018,那么triggerTime会返回100 + 1018 = -930,再举行compareTo方法中的long diff = time - x.time;时,diff = -930 - 95 = -930 - 100 + 5 = 1018 + 5 = 1023,未有溢出,适合常规的预想。

进而,overflowFree方法中把早就晚点的部分时间给减去,正是为了制止在compareTo方法中现身溢出处境。

(说真的,这段代码看的异常痛心,日常情况下也不会生出这种景色,何人会传二个Long.MAX_VALUE呢。要知道Long.MAX_VALUE的飞秒数换算成年的话是292年,何人会那样无聊。。。)

对此周期性任务,在 run 方法中进行成功后会继续设置下二次推行时间,并把职分出席延时队列。但需注意,如若义务实行停业,将不会再被周期性调用。所以在只怕进行停业的周期性职责中,必需抓牢丰裕管理。

ScheduledFutureTask的getDelay方法

public long getDelay(TimeUnit unit) {
    // 执行时间减去当前系统时间
    return unit.convert(time - now(), NANOSECONDS);
}

DelayedWorkQueue 是三个延时有序队列,内部使用 数组 维护队列成分,选择 堆排序 的沉凝保险队列顺序,并在队列成分(ScheduledFutureTask)创立目录,支持高效删除。

ScheduledFutureTask的布局方法

ScheduledFutureTask世襲自FutureTask并完成了RunnableScheduledFuture接口,具体能够参考上文的类图,构造方法如下:

ScheduledFutureTask(Runnable r, V result, long ns) {
    super(r, result);
    this.time = ns;
    this.period = 0;
    this.sequenceNumber = sequencer.getAndIncrement();
}

/**
 * Creates a periodic action with given nano time and period.
 */
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    this.time = ns;
    this.period = period;
    this.sequenceNumber = sequencer.getAndIncrement();
}

/**
 * Creates a one-shot action with given nanoTime-based trigger time.
 */
ScheduledFutureTask(Callable<V> callable, long ns) {
    super(callable);
    this.time = ns;
    this.period = 0;
    this.sequenceNumber = sequencer.getAndIncrement();
}

这里面有多少个重要的习性,下边来解释一下:

  • time:后一次任务试行时的时间;
  • period:推行周期;
  • sequenceNumber:保存职务被加多到ScheduledThreadPoolExecutor中的序号。

在schedule方法中,创制完ScheduledFutureTask对象之后,会执行delayedExecute方法来实行职责。

瞩目:DelayedWorkQueue 的全套队列不是一丝一毫有序的,只保险成分有序出队。

delayedExecute方法

private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 如果线程池已经关闭,使用拒绝策略拒绝任务
    if (isShutdown())
        reject(task);
    else {
        // 添加到阻塞队列中
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            // 确保线程池中至少有一个线程启动,即使corePoolSize为0
            // 该方法在ThreadPoolExecutor中实现
            ensurePrestart();
    }
}

说一下这里的第3个if推断:

  1. 假诺不是SHUTDOWN状态,实践else,否则实施步骤2;
  2. 假使在日前线程池运转景况下能够实践任务,实行else,不然实践步骤3;
  3. 从绿灯队列中删去职责,假若退步,履行else,不然实施步骤4;
  4. 撤除任务,但不间歇施行中的职责。

对于步骤2,能够透过set孔蒂nueExistingPeriodicTasksAfterShutdownPolicy方法设置在线程池关闭时,周期任务继续实践,默感到false,也就是线程池关闭时,不再推行周期义务。

ensurePrestart方法在ThreadPoolExecutor中定义:

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

调用了addWorker方法,可以在浓烈了然Java线程池:ThreadPoolExecutor中查看addWorker方法的牵线,线程池中的职业线程是通过该方法来运转并奉行义务的。

澳门电子游戏网址十大 2DelayedWorkQueue类图

ScheduledFutureTask的run方法

追思一下线程池的实践进程:当线程池中的专门的学问线程运营时,不断地从绿灯队列中收取职务并施行,当然,抽出的职责贯彻了Runnable接口,所以是经过调用职责的run方法来实践义务的。

这边的职务项目是ScheduledFutureTask,所以上边看一下ScheduledFutureTask的run方法:

public void run() {
    // 是否是周期性任务
    boolean periodic = isPeriodic();
    // 当前线程池运行状态下如果不可以执行任务,取消该任务
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 如果不是周期性任务,调用FutureTask中的run方法执行
    else if (!periodic)
        ScheduledFutureTask.super.run();
    // 如果是周期性任务,调用FutureTask中的runAndReset方法执行
    // runAndReset方法不会设置执行结果,所以可以重复执行任务
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 计算下次执行该任务的时间
        setNextRunTime();
        // 重复执行任务
        reExecutePeriodic(outerTask);
    }
}

关于FutureTask的run方法和runAndReset方法,能够参照FutureTask源码剖判。

解析一下进行进度:

  1. 如果当前线程池运生势况不得以进行任务,撤销该任务,然后径直回到,不然实践步骤2;
  2. 假设不是周期性职分,调用FutureTask中的run方法实施,会安装进行结果,然后径直回到,不然实践步骤3;
  3. 只倘使周期性职责,调用FutureTask中的runAndReset方法实践,不会设置进行结果,然后径直再次回到,不然实行步骤4和步骤5;
  4. 计量下次施行该职务的具体时刻;
  5. 再次实践职责。

上面详细批注 DelayedWorkQueue 的完结:

ScheduledFutureTask的reExecutePeriodic方法

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
        super.getQueue().add(task);
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

该措施和delayedExecute方法相同,分裂的是:

  1. 出于调用reExecutePeriodic方法时早就实施过一回周期性义务了,所以不会reject当前职分;
  2. 传扬的职分一定是周期性职分。

骨干入队方法:

onShutdown方法

onShutdown方法是ThreadPoolExecutor中的钩子方法,在ThreadPoolExecutor中怎么着都未曾做,参考深远精晓Java线程池:ThreadPoolExecutor,该情势是在实践shutdown方法时被调用:

@Override void onShutdown() {
    BlockingQueue<Runnable> q = super.getQueue();
    // 获取在线程池已 shutdown 的情况下是否继续执行现有延迟任务
    boolean keepDelayed =
        getExecuteExistingDelayedTasksAfterShutdownPolicy();
    // 获取在线程池已 shutdown 的情况下是否继续执行现有定期任务
    boolean keepPeriodic =
        getContinueExistingPeriodicTasksAfterShutdownPolicy();
    // 如果在线程池已 shutdown 的情况下不继续执行延迟任务和定期任务
    // 则依次取消任务,否则则根据取消状态来判断
    if (!keepDelayed && !keepPeriodic) {
        for (Object e : q.toArray())
            if (e instanceof RunnableScheduledFuture<?>)
                ((RunnableScheduledFuture<?>) e).cancel(false);
        q.clear();
    }
    else {
        // Traverse snapshot to avoid iterator exceptions
        for (Object e : q.toArray()) {
            if (e instanceof RunnableScheduledFuture) {
                RunnableScheduledFuture<?> t =
                    (RunnableScheduledFuture<?>)e;
                // 如果有在 shutdown 后不继续的延迟任务或周期任务,则从队列中删除并取消任务
                if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
                    t.isCancelled()) { // also remove if already cancelled
                    if (q.remove(t))
                        t.cancel(false);
                }
            }
        }
    }
    tryTerminate();
}
public boolean add(Runnable e) { return offer;}public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); // 队列扩容 类似 ArrayList 扩容 size = i + 1; if  { // 队列为空,直接加入 queue[0] = e; setIndex; // 设置元素在队列的索引,即告诉元素自己在队列的第几位 } else { siftUp; // 放入适当的位置 } if (queue[0] == e) { leader = null; // 等待队列头的线程 available.signal(); // 通知 } } finally { lock.unlock(); } return true;}

DelayedWorkQueue

ScheduledThreadPoolExecutor之所以要团结达成窒碍的做事行列,是因为ScheduledThreadPoolExecutor必要的行事行列有些特别。

DelayedWorkQueue是一个基于堆的数据构造,相像于DelayQueue和PriorityQueue。在实践准期义务的时候,每种任务的推行时间都不如,所以DelayedWorkQueue的劳作正是固守施行时间的升序来排列,试行时间隔开分离当前时刻越近的天职在队列的前头(注意:这里的次第并非纯属的,堆中的排序只保障了子节点的后一次实施时间要比父节点的后一次试行时间要大,而叶子节点之间并不一定是逐意气风发的,下文中会表明)。

堆布局如下图所示:

堆结构

看得出,DelayedWorkQueue是贰个依据最小堆布局的行列。堆构造得以行使数组表示,能够转变到如下的数组:

在此种协会中,能够窥见好似下特点:

借使,索引值从0最初,子节点的索引值为k,父节点的索引值为p,则:

  1. 一个节点的左子节点的目录为:k = p * 2 + 1;
  2. 一个节点的右子节点的目录为:k = (p + 1卡塔尔 * 2;
  3. 八个节点的父节点的目录为:p = (k - 1卡塔尔 / 2。

何以要运用DelayedWorkQueue呢?

定期任务实施时需求抽出近期要实行的职分,所以职分在队列中年老年是出队时必然若是时下队列中试行时间最靠前的,所以本来要使用优先级队列。

DelayedWorkQueue是二个预先级队列,它能够确定保证每回出队的职责都以现阶段队列中进行时间最靠前的,由于它是基于堆构造的系列,堆构造在推行插入和删除操作时的最坏时间复杂度是 O(logN)

入队方式中最首要的是 siftUp 方法, sift 在立陶宛共和国语单词中是 的意思,这里可将 siftUp 领悟为向前筛,找到适当的 堆排序点 加进去。

DelayedWorkQueue的属性

// 队列初始容量
private static final int INITIAL_CAPACITY = 16;
// 根据初始容量创建RunnableScheduledFuture类型的数组
private RunnableScheduledFuture<?>[] queue =
    new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;

// leader线程
private Thread leader = null;
// 当较新的任务在队列的头部可用时,或者新线程可能需要成为leader,则通过该条件发出信号
private final Condition available = lock.newCondition();

留意这里的leader,它是Leader-Follower方式的变体,用于裁减不供给的准时等待。什么看头啊?对于四线程的网络模型来讲:

抱有线程会有两种身份中的生龙活虎种:leader和follower,以致贰个做事中的状态:proccesser。它的基本法规就是,长久最多独有二个leader。而颇负follower都在伺机成为leader。线程池运转时会自动发出一个Leader负担等待网络IO事件,当有一个轩然大波时有发生时,Leader线程首先通报二个Follower线程将其提拔为新的Leader,然后本人就去做事了,去管理那么些互联网事件,管理达成后加盟Follower线程等待队列,等待下一次成为Leader。这种办法能够压实CPU高速缓存相仿性,及解除动态内部存款和储蓄器分配和线程间的数据沟通。

参考自:http://blog.csdn.net/goldlevi/article/details/7705180

具体leader的机能在剖判take方法时再详尽介绍。

private void siftUp(int k, RunnableScheduledFuture<?> key) { while  { int parent =  >>> 1; // /2 RunnableScheduledFuture<?> e = queue[parent]; if (key.compareTo >= 0) break; queue[k] = e; setIndex; k = parent; } queue[k] = key; setIndex;}

offer方法

既然是拥塞队列,入队的操作如add和put方法都调用了offer方法,上边查看一下offer方法:

public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        // queue是一个RunnableScheduledFuture类型的数组,如果容量不够需要扩容
        if (i >= queue.length)
            grow();
        size = i + 1;
        // i == 0 说明堆中还没有数据
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        } else {
        // i != 0 时,需要对堆进行重新排序
            siftUp(i, e);
        }
        // 如果传入的任务已经是队列的第一个节点了,这时available需要发出信号
        if (queue[0] == e) {
            // leader设置为null为了使在take方法中的线程在通过available.signal();后会执行available.awaitNanos(delay);
            leader = null;
            available.signal();
        }
    } finally {
        lock.unlock();
    }
    return true;
}

至于Condition的介绍请参见长远驾驭AbstractQueuedSynchronizer(三)

此地的主要性是siftUp方法。

siftUp 首要思想是将猛增的天职与前 /2 的任务比较,假诺任务试行时间较近者替交换一下地点置 /2。依次往前比较,直到无替换产生。每一回新增加成分调用 siftUp 仅能承保首个因素是细微的。整个队列不自然有序:

澳门苹果官网,siftUp方法

private void siftUp(int k, RunnableScheduledFuture<?> key) {
    while (k > 0) {
        // 找到父节点的索引
        int parent = (k - 1) >>> 1;
        // 获取父节点
        RunnableScheduledFuture<?> e = queue[parent];
        // 如果key节点的执行时间大于父节点的执行时间,不需要再排序了
        if (key.compareTo(e) >= 0)
            break;
        // 如果key.compareTo(e) < 0,说明key节点的执行时间小于父节点的执行时间,需要把父节点移到后面
        queue[k] = e;
        // 设置索引为k
        setIndex(e, k);
        k = parent;
    }
    // key设置为排序后的位置中
    queue[k] = key;
    setIndex(key, k);
}

代码很好驾驭,正是循环的依靠key节点与它的父节点来判断,如若key节点的实行时间低于父节点,则将四个节点调换,使实践时间靠前的节点排列在队列的前边。

设若新入队的节点的延迟时间(调用getDelay(卡塔尔(قطر‎方法赢得)是5,奉行进程如下:

  1. 先将新的节点加多到数组的尾巴,这时候新节点的索引k为7:澳门电子游戏网址十大,

  2. 算算新父节点的目录:parent = (k - 1卡塔尔 >>> 1,parent = 3,那么queue[3]的时光间距值为8,因为 5 < 8 ,将施行queue[7] = queue[3]:

  3. 那时将k设置为3,继续循环,再度计算parent为1,queue[1]的时刻间距为3,因为 5 > 3 ,这时候退出循环,最终k为3:

可以预知,每便新扩充节点时,只是依据父节点来判别,而不会影响兄弟节点。

别的,setIndex方法只是安装了ScheduledFutureTask中的heapIndex属性:

private void setIndex(RunnableScheduledFuture<?> f, int idx) {
    if (f instanceof ScheduledFutureTask)
        ((ScheduledFutureTask)f).heapIndex = idx;
}