前言scheduledthreadpoolexecutor可以用来很方便实现我们的调度任务,具体使用可以参考调度线程池scheduledthreadpoolexecutor的正确使用姿势这篇文章,那大家知道它是怎么实现的吗,本文就带大家来揭晓谜底。
实现机制分析我们先思考下,如果让大家去实现scheduledthreadpoolexecutor可以周期性执行任务的功能,需要考虑哪些方面呢?
scheduledthreadpoolexecutor的整体实现思路是什么呢?答:我们是不是可以继承线程池类,按照线程池的思路,将任务先丢到阻塞队列中,等到时间到了,工作线程就从阻塞队列获取任务执行。
如何实现等到了未来的时间点就开始执行呢?答:我们可以根据参数获取这个任务还要多少时间执行,那么我们是不是可以从阻塞队列中获取任务的时候,通过条件队列的的awaitnanos(delay)方法,阻塞一定时间。
如何实现 任务的重复性执行呢?答:这就更加简单了,任务执行完成后,把它再次加入到队列不就行了吗。
源码解析类结构图
scheduledthreadpoolexecutor的类结构图如上图所示,很明显它是在我们的线程池threadpoolexecutor框架基础上扩展的。
scheduledexecutorservice:实现了该接口,封装了调度相关的apithreadpoolexecutor:继承了该类,保留了线程池的能力和整个实现的框架delayedworkqueue:内部类,延迟阻塞队列。scheduledfuturetask:延迟任务对象,包含了任务、任务状态、剩余的时间、结果等信息。重要属性通过scheduledthreadpoolexecutor类的成员属性,我们可以了解它的数据结构。
shutdown 后是否继续执行周期任务(重复执行)private volatile boolean continueexistingperiodictasksaftershutdown;shutdown 后是否继续执行延迟任务(只执行一次)private volatile boolean executeexistingdelayedtasksaftershutdown = true;调用cancel()方法后,是否将该任务从队列中移除,默认falseprivate volatile boolean removeoncancel = false;任务的序列号,保证fifo队列的顺序,用来比较优先级private static final atomiclong sequencer = new atomiclong()scheduledfuturetask延迟任务类scheduledfuturetask 继承 futuretask,实现 runnablescheduledfuture 接口,无论是 runnable 还是 callable,无论是否需要延迟和定时,所有的任务都会被封装成 scheduledfuturetask。该类具有延迟执行的特点, 覆盖 futuretask 的 run 方法来实现对延时执行、周期执行的支持。对于延时任务调用 futuretask#run,而对于周期性任务则调用futuretask#runandreset 并且在成功之后根据 fixed-delay/fixed-rate模式来设置下次执行时间并重新将任务塞到工作队列。成员属性如下:// 任务序列号private final long sequencenumber;// 任务可以被执行的时间,交付时间,以纳秒表示private long time; // 0 表示非周期任务// 正数表示 fixed-rate(两次开始启动的间隔)模式的周期,// 负数表示 fixed-delay(一次执行结束到下一次开始启动) 模式private final long period; // 执行的任务对象runnablescheduledfuturedelayedworkqueue延迟队列delayedworkqueue 是支持延时获取元素的阻塞队列, 内部采用优先队列 priorityqueue(小根堆、满二叉树)存储元素。内部数据结构是数组,所以延迟队列出队头元素后需要让其他元素(尾)替换到头节点,防止空指针异常。成员属性如下:// 初始容量private static final int initial_capacity = 16; // 节点数量private int size = 0;// 存放任务的数组private runnablescheduledfuture?[] queue = new runnablescheduledfuture?[initial_capacity]; // 控制并发用的锁private final reentrantlock lock = new reentrantlock(); // 条件队列private final condition available = lock.newcondition();//指定用于等待队列头节点任务的线程private thread leader = null;提交延迟任务schedule()原理延迟执行方法,并指定延迟执行的时间,只会执行一次。
schedule()方法是延迟任务方法的入口。public scheduledfuture? schedule(runnable command, long delay, timeunit unit) { // 判空处理 if (command == null || unit == null) throw new nullpointerexception(); // 将外部传入的任务封装成延迟任务对象scheduledfuturetask runnablescheduledfuture? t = decoratetask(command, new scheduledfuturetask(command, null, triggertime(delay, unit))); // 执行延迟任务 delayedexecute(t); return t;}decoratetask(...) 该方法是封装延迟任务调用triggertime(delay, unit)方法计算延迟的时间。// 返回【当前时间 + 延迟时间】,就是触发当前任务执行的时间private long triggertime(long delay, timeunit unit) { // 设置触发的时间 return triggertime(unit.tonanos((delay < 0) ? 0 : delay));}long triggertime(long delay) { // 如果 delay 1)) ? delay : overflowfree(delay));}// 下面这种情况很少,大家看不懂可以不用强行理解// 如果某个任务的 delay 为负数,说明当前可以执行(其实早该执行了)。// 阻塞队列中维护任务顺序是基于 compareto 比较的,比较两个任务的顺序会用 time 相减。// 那么可能出现一个 delay 为正数减去另一个为负数的 delay,结果上溢为负数,则会导致 compareto 产生错误的结果private long overflowfree(long delay) { delayed head = (delayed) super.getqueue().peek(); if (head != null) { long headdelay = head.getdelay(nanoseconds); // 判断一下队首的delay是不是负数,如果是正数就不用管,怎么减都不会溢出 // 否则拿当前 delay 减去队首的 delay 来比较看,如果不出现上溢,排序不会乱 // 不然就把当前 delay 值给调整为 long.max_value + 队首 delay if (headdelay < 0 && (delay - headdelay < 0)) delay = long.max_value + headdelay; } return delay;}调用runnablescheduledfuture的构造方法封装为延迟任务scheduledfuturetask(runnable r, v result, long ns) { super(r, result); // 任务的触发时间 this.time = ns; // 任务的周期, 延迟任务的为0,因为不需要重复执行 this.period = 0; // 任务的序号 + 1 this.sequencenumber = sequencer.getandincrement();}调用decoratetask()方法装饰延迟任务// 没有做任何操作,直接将 task 返回,该方法主要目的是用于子类扩展protected提交周期任务scheduleatfixedrate()原理按照固定的频率周期性的执行任务,捕手renwu,一次任务的启动到下一次任务的启动的间隔
public scheduledfuture? scheduleatfixedrate(runnable command, long initialdelay, long period, timeunit unit) { if (command == null || unit == null) throw new nullpointerexception(); if (period <= 0) throw new illegalargumentexception(); // 任务封装,【指定初始的延迟时间和周期时间】 scheduledfuturetask sft =new scheduledfuturetask(command, null, triggertime(initialdelay, unit), unit.tonanos(period)); // 默认返回本身 runnablescheduledfuture t = decoratetask(command, sft); sft.outertask = t; // 开始执行这个任务 delayedexecute(t); return t;}提交周期任务schedulewithfixeddelay()原理按照指定的延时周期性执行任务,上一个任务执行完毕后,延时一定时间,再次执行任务。
public scheduledfuture? schedulewithfixeddelay(runnable command, long initialdelay, long delay, timeunit unit) { if (command == null || unit == null) throw new nullpointerexception(); if (delay <= 0) throw new illegalargumentexception(); // 任务封装,【指定初始的延迟时间和周期时间】,周期时间为 - 表示是 fixed-delay 模式 scheduledfuturetask sft = new scheduledfuturetask(command, null, triggertime(initialdelay, unit), unit.tonanos(-delay)); runnablescheduledfuture t = decoratetask(command, sft); sft.outertask = t; // 开始执行这个任务 delayedexecute(t); return t;}执行任务delayedexecute(t)原理上面多种提交任务的方式,殊途同归,最终都会调用delayedexecute()方法执行延迟或者周期任务。
delayedexecute()方法是执行延迟任务的入口private void delayedexecute(runnablescheduledfuture? task) { // 线程池是 shutdown 状态,执行拒绝策略 if (isshutdown()) // 调用拒绝策略的方法 reject(task); else { // 把当前任务放入阻塞队列 super.getqueue().add(task); // 线程池状态为 shutdown 并且不允许执行任务了,就从队列删除该任务,并设置任务的状态为取消状态 // 非主流程,可以跳过,不重点看了 if (isshutdown() && !canrunincurrentrunstate(task.isperiodic()) && remove(task)) task.cancel(false); else // 开始执行了哈 ensureprestart(); }}ensureprestart()方法开启线程执行// threadpoolexecutor#ensureprestartvoid ensureprestart() { int wc = workercountof(ctl.get()); // worker数目小于corepoolsize,则添加一个worker。 if (wc < corepoolsize) // 第二个参数 true 表示采用核心线程数量限制,false 表示采用 maximumpoolsize addworker(null, true); // corepoolsize = 0的情况,至少开启一个线程,【担保机制】 else if (wc == 0) addworker(null, false);}addworker()方法实际上父类threadpoolexecutor的方法,这个方法在该文章 java线程池源码深度解析中详细介绍过,这边做个总结:
如果线程池中工作线程数量小于最大线程数,创建工作线程,执行任务。如果线程池中工作线程数量大于最大线程数,直接返回。获取延迟任务take()原理目前工作线程已经创建好了,工作线程开始工作了,它会从阻塞队列中获取延迟任务执行,这部分也是线程池里面的原理,不做展开,那我们看下它是如何实现延迟执行的? 主要关注如何从阻塞队列中获取任务。
delayedworkqueue#take()方法获取延迟任务该方法会在上面的addwoker()方法创建工作线程后,工作线程中循环持续调用workqueue.take()方法获取延迟任务。该方法主要获取延迟队列中任务延迟时间小于等于0 的任务。如果延迟时间不小于0,那么调用条件队列的awaitnanos(delay)阻塞方法等待一段时间,等时间到了,延迟时间自然小于等于0了。获取到任务后,工作线程就可以开始执行调度任务了。// delayedworkqueue#take()public runnablescheduledfuture? take() throws interruptedexception { final reentrantlock lock = this.lock; // 加可中断锁 lock.lockinterruptibly(); try { // 自旋 for (;;) { // 获取阻塞队列中的头结点 runnablescheduledfuture? first = queue[0]; // 如果阻塞队列没有数据,为空 if (first == null) // 等待队列不空,直至有任务通过 offer 入队并唤醒 available.await(); else { // 获取头节点的的任务还剩余多少时间才执行 long delay = first.getdelay(nanoseconds); if (delay <= 0) // 到达触发时间,获取头节点并调整堆,重新选择延迟时间最小的节点放入头部 return finishpoll(first); // 逻辑到这说明头节点的延迟时间还没到 first = null; // 说明有 leader 线程在等待获取头节点,当前线程直接去阻塞等待 if (leader != null) // 当前线程阻塞 available.await(); else { // 没有 leader 线程,【当前线程作为leader线程,并设置头结点的延迟时间作为阻塞时间】 thread thisthread = thread.currentthread(); leader = thisthread; try { // 当前线程通过awaitnanos方法等待delay时间后,会自动唤醒,往后面继续执行 available.awaitnanos(delay); // 到达阻塞时间时,当前线程会从这里醒来,进入下一轮循环,就有可能执行了 } finally { // t堆顶更新,leader 置为 null,offer 方法释放锁后, // 有其它线程通过 take/poll 拿到锁,读到 leader == null,然后将自身更新为leader。 if (leader == thisthread) // leader 置为 null 用以接下来判断是否需要唤醒后继线程 leader = null; } } } } } finally { // 没有 leader 线程并且头结点不为 null,唤醒阻塞获取头节点的线程, // 【如果没有这一步,就会出现有了需要执行的任务,但是没有线程去执行】 if (leader == null && queue[0] != null) available.signal(); // 解锁 lock.unlock(); }}finishpoll()方法获取到任务后执行该方法主要做两个事情, 获取头节点并调整堆,重新选择延迟时间最小的节点放入头部。
private runnablescheduledfuture? finishpoll(runnablescheduledfuture? f) { // 获取尾索引 int s = --size; // 获取尾节点 runnablescheduledfuture? x = queue[s]; // 将堆结构最后一个节点占用的 slot 设置为 null,因为该节点要尝试升级成堆顶,会根据特性下调 queue[s] = null; // s == 0 说明 当前堆结构只有堆顶一个节点,此时不需要做任何的事情 if (s != 0) // 从索引处 0 开始向下调整 siftdown(0, x); // 出队的元素索引设置为 -1 setindex(f, -1); return f;}延迟任务运行的原理从延迟队列中获取任务后,工作线程会调用延迟任务的run()方法执行任务。
scheduledfuturetask#run()方法运行任务调用isperiodic()方法判断任务是否是周期性任务还是非周期性任务如果任务是非周期任务,就调用父类的futuretask#run()执行一次如果任务是非周期任务,就调用父类的futuretask#runandreset(), 返回true会设置下一次的执行时间,重新放入线程池的阻塞队列中,等待下次获取执行public void run() { // 是否周期性,就是判断 period 是否为 0 boolean periodic = isperiodic(); // 根据是否是周期任务检查当前状态能否执行任务,不能执行就取消任务 if (!canrunincurrentrunstate(periodic)) cancel(false); // 非周期任务,直接调用 futuretask#run 执行一次 else if (!periodic) scheduledfuturetask.super.run(); // 周期任务的执行,返回 true 表示执行成功 else if (scheduledfuturetask.super.runandreset()) { // 设置周期任务的下一次执行时间 setnextruntime(); // 任务的下一次执行安排,如果当前线程池状态可以执行周期任务,加入队列,并开启新线程 reexecuteperiodic(outertask); }}futuretask#runandreset()执行周期性任务周期任务正常完成后任务的状态不会变化,依旧是 new,不会设置 outcome 属性。但是如果本次任务执行出现异常,会进入 setexception 方法将任务状态置为异常,把异常保存在 outcome 中。方法返回 false,后续的该任务将不会再周期的执行protected boolean runandreset() { // 任务不是新建的状态了,或者被别的线程执行了,直接返回 false if (state != new || !unsafe.compareandswapobject(this, runneroffset, null, thread.currentthread())) return false; boolean ran = false; int s = state; try { callablescheduledfuturetask#setnextruntime()设置下次执行时间如果属性period大于0,表示fixed-rate模式,直接加上period时间即可。如果属性period小于等于0, 表示是fixed-delay模式, 调用triggertime重新计算下次时间。// 任务下一次的触发时间private void setnextruntime() { long p = period; if (p > 0) // fixed-rate 模式,【时间设置为上一次执行任务的时间 + p】,两次任务执行的时间差 time += p; else // fixed-delay 模式,下一次执行时间是【当前这次任务结束的时间(就是现在) + delay 值】 time = triggertime(-p);}scheduledfuturetask#reexecuteperiodic(),重新放入阻塞任务队列,等待获取,进行下一轮执行// scheduledthreadpoolexecutor#reexecuteperiodicvoid reexecuteperiodic(runnablescheduledfuture? task) { if (canrunincurrentrunstate(true)) { // 【放入任务队列】 super.getqueue().add(task); // 如果提交完任务之后,线程池状态变为了 shutdown 状态,需要再次检查是否可以执行, // 如果不能执行且任务还在队列中未被取走,则取消任务 if (!canrunincurrentrunstate(true) && remove(task)) task.cancel(false); else // 当前线程池状态可以执行周期任务,加入队列,并【根据线程数量是否大于核心线程数确定是否开启新线程】 ensureprestart(); }}
微型虚拟示波器的设计与实现
分析一下电子计数继电器接线方法
软体机器人受欢迎的原因
不止于“芯”,共绘STM32创新之旅 | STM32全国研讨会圆满收官
校园戏曲新花样,机器人来做评委
调度线程池ScheduledThreadPoolExecutor源码解析
论欧美各国三元材料企业小盘点
vivo手机,camera &music已经融合在一起了
Java中volatile的作用以及用法
NLA自然语言分析,助力解决数据分析的难题
苹果正式发邀请函!iphone8发布会倒计时:9月22日iPhone8上市开售,国行价格8188元,肾好疼啊
特斯拉股价大跌 市值缩水至约660亿美元
典型的单稳态电路分享
拜登政府誓要大力行动解决供应短缺问题
基于CAN总线通讯技术实现汽车四轮独立驱动控制系统的设计
国家再出支持集成电路产业和软件产业的政策,对五种情形免征进口关税
智能门锁行业洗牌千家混战 或成未来的新家电
5G资讯:5G手机与流量同样都消费的起 北京移动成立5G产业联盟
FPGA+STM32基于模型设计开发手册正式版发布
华为麦芒8全面评测 这次偏重了拍照方面