1、futuretask 对象介绍
future 对象大家都不陌生,是 jdk1.5 提供的接口,是用来以阻塞的方式获取线程异步执行完的结果。 在 java 中想要通过线程执行一个任务,离不开 runnable 与 callable 这两个接口。 runnable 与 callable 的区别在于,runnable 接口只有一个 run 方法,该方法用来执行逻辑,但是并没有返回值;而 callable 的 call 方法,同样用来执行业务逻辑,但是是有一个返回值的。
callable 执行任务过程中可以通过 futuretask 获得任务的执行状态,并且可以在执行完成后通过 future.get () 方式获取执行结果。 future 是一个接口,而 futuretask 就是 future 的实现类。并且 futuretask 实现了 runnablefuture(runnable + future),说明我们可以创建一个 futuretask 并直接把它放到线程池执行,然后获取 futuretask 的执行结果。
2、futuretask 源码解析
2.1 主要方法和属性
那么 futuretask 是如何通过阻塞的方式来获取到异步线程执行的结果的呢?我们看下 futuretask 中的属性。
// futuretask的状态及其常量privatevolatileint state; privatestaticfinalint new =0; privatestaticfinalint completing =1; privatestaticfinalint normal =2; privatestaticfinalint exceptional =3; privatestaticfinalint cancelled =4; privatestaticfinalint interrupting =5; privatestaticfinalint interrupted =6; // callable对象,执行完后置空 privatecallable callable; // 要返回的结果或要引发的异常来自 get() 方法 privateobject outcome;// non-volatile, protected by state reads/writes // 执行callable的线程 privatevolatilethread runner; // 等待线程的一个链表结构 privatevolatilewaitnode waiters;
futuretask 中几个比较重要的方法。
// 取消任务的执行booleancancel(boolean mayinterruptifrunning);// 返回任务是否已经被取消booleaniscancelled();// 返回任务是否已经完成,任务状态不为new即为完成booleanisdone();// 通过get方法获取任务的执行结果vget()throwsinterruptedexception,executionexception;// 通过get方法获取任务的执行结果,带有超时,如果超过给定时间则抛出异常vget(long timeout,timeunit unit) throwsinterruptedexception,executionexception,timeoutexception;
2.2 futuretask 执行
当我们在线程池中执行一个 callable 方法时,其实是将 callable 任务封装成一个 runnablefuture 对象去执行,同时将这个 runnablefuture 对象返回,这样我们就拿到了 futuretask 的引用,可以随时获取到任务执行的状态,并且可以在任务执行完成后通过该对象获取执行结果。 以下为 threadpoolexecutor 线程池提交一个 callable 方法的源码。
public future submit(callable task){ if(task ==null)thrownewnullpointerexception(); runnablefuture ftask =newtaskfor(task); execute(ftask); return ftask; }protected runnablefuture newtaskfor(callable callable){ returnnewfuturetask(callable); }
2.3 run 方法介绍
runnablefuture 其实也是一个可以执行的 runnable,我们看下他的 run 方法。其主要流程就是执行 call 方法,正常执行完毕后将 result 结果赋值到 outcome 属性上。
publicvoidrun(){ if(state != new || !unsafe.compareandswapobject(this, runneroffset, null,thread.currentthread())) return; try{ // 将callable赋值到本地变量 callable c = callable; // 判断callable不为空并且futuretask的状态必须为新创建 if(c !=null&& state == new){ v result; boolean ran; try{ // 执行call方法(用户自己实现的call逻辑),并获取到result结果 result = c.call(); ran =true; }catch(throwable ex){ result =null; ran =false; // 如果执行过程出现异常,则将异常对象赋值到outcome上 setexception(ex); } // 如果正常执行完毕,则将result赋值到outcome属性上 if(ran) set(result); } }finally{ // runner must be non-null until state is settled to // prevent concurrent calls to run() runner =null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if(s >= interrupting) handlepossiblecancellationinterrupt(s); } }
以下逻辑为正常执行完成后赋值的逻辑。
// 如果任务没有被取消,将future执行完的返回值赋值给result结果// futuretask任务的执行状态是通过cas的方式进行赋值的,并且由此可知,completing其实是一个瞬时状态// 当将线程执行结果赋值给outcome后,状态会修改为对应的normal,即正常结束protectedvoidset(v v){ if(unsafe.compareandswapint(this, stateoffset, new, completing)){ outcome = v; unsafe.putorderedint(this, stateoffset, normal);// final state finishcompletion(); } }
以下为执行异常时赋值逻辑,直接将 throwable 对象赋值到 outcome 属性上。
protectedvoidsetexception(throwable t){ if(unsafe.compareandswapint(this, stateoffset, new, completing)){ outcome = t; unsafe.putorderedint(this, stateoffset, exceptional);// final state finishcompletion(); } }无论是正常执行还是异常执行,最终都会调用一个 finishcompletion 方法,用来做工作的收尾工作。
2.4 get 方法介绍
future 的 get 方法有两个重载的方法,一个是 get () 获取结果,一个是 get (long, timeunit) 带有超时时间的获取结果,我们看下 futuretask 中的这两个方法是如何实现的。
// 不带有超时时间,一直阻塞直到获取结果publicvget()throwsinterruptedexception,executionexception{ int s = state; if(s <= completing) // 等待结果完成,带有超时的get方法也是调用的awaitdone方法 s =awaitdone(false,0l); // 返回结果 returnreport(s); }// 带有超时时间的获取结果,如果超过时间还没有获取到结果则抛出异常publicvget(long timeout,timeunit unit) throwsinterruptedexception,executionexception,timeoutexception{ if(unit ==null) thrownewnullpointerexception(); int s = state; // 如果任务未中断,调用awaitdone方法等待任务结果 if(s <= completing && (s =awaitdone(true, unit.tonanos(timeout))) completing){ if(q !=null) q.thread =null; return s; } elseif(s == completing)// cannot time out yet // thread.yield() 方法,使当前线程由执行状态,变成为就绪状态,让出cpu时间,在下一个线程执行时候,此线程有可能被执行,也有可能没有被执行。 // completing状态为瞬时状态,任务执行完成,要么是正常结束,要么异常结束,后续会被置为normal或者exceptional thread.yield(); elseif(q ==null) // 每调用一次get方法,都会创建一个waitnode等待节点 q =newwaitnode(); elseif(!queued) // 将该等待节点添加到链表结构waiters中,q.next = waiters 即在waiters的头部插入 queued = unsafe.compareandswapobject(this, waitersoffset, q.next = waiters, q); // 如果方法带有超时判断,则判断当前时间是否已经超过了截止时间,如果超过了及截止日期,则退出循环直接返回当前状态,此时任务状态一定是new elseif(timed){ nanos = deadline -system.nanotime(); if(nanos = cancelled) thrownewcancellationexception(); thrownewexecutionexception((throwable)x); }
2.5 cancel 方法介绍
cancel 方法用于取消正在运行的任务,如果任务取消成功,则返回 true,如果取消失败则返回 false。
// mayinterruptifrunning:允许中断正在运行的任务publicbooleancancel(boolean mayinterruptifrunning){ // mayinterruptifrunning如果为true则将状态置为interrupting,如果未false则将状态置为cancelled if(!(state == new && unsafe.compareandswapint(this, stateoffset, new, mayinterruptifrunning ? interrupting : cancelled))) returnfalse; // 如果状态修改成功后,判断是否允许中断线程,如果允许,则调用thread的interrupt方法中断 try{// in case call to interrupt throws exception if(mayinterruptifrunning){ try{ thread t = runner; if(t !=null) t.interrupt(); }finally{// final state unsafe.putorderedint(this, stateoffset, interrupted); } } }finally{ // 取消后的收尾工作 finishcompletion(); } returntrue; }
2.6 isdone/iscancelled 方法介绍
isdone 方法用于判断 futuretask 是否已经完成;iscancelled 方法用来判断 futuretask 是否已经取消,这两个方法都是通过状态位来判断的。
publicbooleaniscancelled(){ return state >= cancelled; } publicbooleanisdone(){ return state != new; }
2.7 finishcompletion 方法介绍
我们看下 finishcompletion 方法都做了哪些工作。
// 删除所有等待线程并发出信号,最后执行done方法privatevoidfinishcompletion(){ // assert state > completing; for(waitnode q;(q = waiters)!=null;){ if(unsafe.compareandswapobject(this, waitersoffset, q,null)){ for(;;){ thread t = q.thread; if(t !=null){ q.thread =null; locksupport.unpark(t); } waitnode next = q.next; if(next ==null) break; q.next =null;// unlink to help gc q = next; } break; } } done(); callable =null;// to reduce footprint }我们看到 done 方法是一个受保护的空方法,此处没有任何逻辑,由其子类去根据自己的业务去实现相应的逻辑。例如:java.util.concurrent.executorcompletionservice.queueingfuture。protectedvoiddone(){}
3、总结
通过源码解读可以了解到 future 的原理:
第一步:主线程将任务封装成一个 callable 对象,通过 submit 方法提交到线程池去执行。
第二步:线程池执行任务的 run 方法,主线程则可以继续执行其他逻辑。
第三步:线程池中方法执行完成后将结果赋值到 outcome 属性上,并修改任务状态。
第四步:主线程在需要拿到异步任务结果的时候,主动调用 fugure.get () 方法来获取结果。
第五步:如果异步线程在执行过程中发生异常,则会在调用 future.get () 方法的时候抛出来。 以上就是对于 futuretask 的分析,我们可以了解 futuretask 任务执行的方式以及 future.get 已阻塞的方式获取线程执行的结果原理,并且从代码中可以了解 futuretask 的任务执行状态以及状态的变化过程。
芯片附近0.1uF电容有什么用?
苹果13配置颜色
TensorFlow的衰落与PyTorch的崛起
气体涡轮流量计产品的主要特点及使用注意事项
三星拿下了SaskTel电信网络运营商的5G订单
FutureTask是如何通过阻塞来获取到异步线程执行结果的呢?
工业计算机与商用计算机的区别
域控制器架构的优点有哪些
你知道拼接屏的优势有哪些吗?拼出你的无限创意,拼接屏优势大揭秘
开沃汽车与大唐移动和南京铁塔签署了5G智能网联战略合作框架协议
海信冰箱启幕“真空+智能”新时代,科技加持赢得“智慧”未来
UAS有效载荷在更小的封装中获得更智能的传感器
DENSO斥资2.7亿美元促进车载OLED显示器量产
湖南电信全屋wifi_让家庭生活更加智慧
LM1117-3.3的输出电流计算方式分析
支持多核处理 诺基亚将推出两款WP8系统新机
“虚拟电厂”是怎么发电的?
贝恩资本以180亿美元拿下东芝存储器
李彦宏:陆奇很优秀 现在百度没有任何项目直接向我汇报
压电薄膜传感器的性能特点及在心音心电监测系统中的应用