1、FutureTask 對象介紹
Future 對象大家都不陌生,是 JDK1.5 提供的接口,是用來以阻塞的方式獲取線程異步執行完的結果。 在 Java 中想要通過線程執行一個任務,離不開 Runnable 與 Callable 這兩個接口。 Runnable 與 Callable 的區別在于,Runnable 接口只有一個 run 方法,該方法用來執行邏輯,但是并沒有返回值;而 Callable 的 call 方法,同樣用來執行業務邏輯,但是是有一個返回值的。
Callable 執行任務過程中可以通過 FutureTask 獲得任務的執行狀態,并且可以在執行完成后通過 Future.get () 方式獲取執行結果。 Future 是一個接口,而 FutureTask 就是 Future 的實現類。并且 FutureTask 實現了 RunnableFuture(Runnable + Future),說明我們可以創建一個 FutureTask 并直接把它放到線程池執行,然后獲取 FutureTask 的執行結果。
2、FutureTask 源碼解析
2.1 主要方法和屬性
那么 FutureTask 是如何通過阻塞的方式來獲取到異步線程執行的結果的呢?我們看下 FutureTask 中的屬性。
// FutureTask的狀態及其常量
privatevolatileint state;
privatestaticfinalint NEW =0;
privatestaticfinalint COMPLETING =1;
privatestaticfinalint NORMAL =2;
privatestaticfinalint EXCEPTIONAL =3;
privatestaticfinalint CANCELLED =4;
privatestaticfinalint INTERRUPTING =5;
privatestaticfinalint INTERRUPTED =6;
// callable對象,執行完后置空
privateCallable callable;
// 要返回的結果或要引發的異常來自 get() 方法
privateObject outcome;// non-volatile, protected by state reads/writes
// 執行Callable的線程
privatevolatileThread runner;
// 等待線程的一個鏈表結構
privatevolatileWaitNode waiters;
?FutureTask 中幾個比較重要的方法。
// 取消任務的執行 booleancancel(boolean mayInterruptIfRunning); // 返回任務是否已經被取消 booleanisCancelled(); // 返回任務是否已經完成,任務狀態不為NEW即為完成 booleanisDone(); // 通過get方法獲取任務的執行結果 Vget()throwsInterruptedException,ExecutionException; // 通過get方法獲取任務的執行結果,帶有超時,如果超過給定時間則拋出異常 Vget(long timeout,TimeUnit unit) throwsInterruptedException,ExecutionException,TimeoutException;
?2.2 FutureTask 執行
當我們在線程池中執行一個 Callable 方法時,其實是將 Callable 任務封裝成一個 RunnableFuture 對象去執行,同時將這個 RunnableFuture 對象返回,這樣我們就拿到了 FutureTask 的引用,可以隨時獲取到任務執行的狀態,并且可以在任務執行完成后通過該對象獲取執行結果。 以下為 ThreadPoolExecutor 線程池提交一個 callable 方法的源碼。
publicFuture submit(Callable task){ if(task ==null)thrownewNullPointerException(); RunnableFuture ftask =newTaskFor(task); execute(ftask); return ftask; } protected RunnableFuture newTaskFor(Callable callable){ returnnewFutureTask (callable); }
?2.3 run 方法介紹
RunnableFuture 其實也是一個可以執行的 runnable,我們看下他的 run 方法。其主要流程就是執行 call 方法,正常執行完畢后將 result 結果賦值到 outcome 屬性上。
publicvoidrun(){
if(state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null,Thread.currentThread()))
return;
try{
// 將callable賦值到本地變量
Callable c = callable;
// 判斷callable不為空并且FutureTask的狀態必須為新創建
if(c !=null&& state == NEW){
V result;
boolean ran;
try{
// 執行call方法(用戶自己實現的call邏輯),并獲取到result結果
result = c.call();
ran =true;
}catch(Throwable ex){
result =null;
ran =false;
// 如果執行過程出現異常,則將異常對象賦值到outcome上
setException(ex);
}
// 如果正常執行完畢,則將result賦值到outcome屬性上
if(ran)
set(result);
}
}finally{
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner =null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if(s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
?以下邏輯為正常執行完成后賦值的邏輯。
// 如果任務沒有被取消,將future執行完的返回值賦值給result結果
// FutureTask任務的執行狀態是通過CAS的方式進行賦值的,并且由此可知,COMPLETING其實是一個瞬時狀態
// 當將線程執行結果賦值給outcome后,狀態會修改為對應的NORMAL,即正常結束
protectedvoidset(V v){
if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)){
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);// final state
finishCompletion();
}
}
?以下為執行異常時賦值邏輯,直接將 Throwable 對象賦值到 outcome 屬性上。
protectedvoidsetException(Throwable t){
if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)){
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);// final state
finishCompletion();
}
}
?無論是正常執行還是異常執行,最終都會調用一個 finishCompletion 方法,用來做工作的收尾工作。
2.4 get 方法介紹
Future 的 get 方法有兩個重載的方法,一個是 get () 獲取結果,一個是 get (long, TimeUnit) 帶有超時時間的獲取結果,我們看下 FutureTask 中的這兩個方法是如何實現的。
// 不帶有超時時間,一直阻塞直到獲取結果
publicVget()throwsInterruptedException,ExecutionException{
int s = state;
if(s <= COMPLETING)
// 等待結果完成,帶有超時的get方法也是調用的awaitDone方法
s =awaitDone(false,0L);
// 返回結果
returnreport(s);
}
// 帶有超時時間的獲取結果,如果超過時間還沒有獲取到結果則拋出異常
publicVget(long timeout,TimeUnit unit)
throwsInterruptedException,ExecutionException,TimeoutException{
if(unit ==null)
thrownewNullPointerException();
int s = state;
// 如果任務未中斷,調用awaitDone方法等待任務結果
if(s <= COMPLETING &&
(s =awaitDone(true, unit.toNanos(timeout)))<= COMPLETING)
thrownewTimeoutException();
// 返回結果
returnreport(s);
}
?我們主要看下 awaitDone 方法的執行邏輯。此方法會通過 for 循環的方式一直阻塞等待任務執行完成。如果帶有超時時間,則超過截止時間后會直接返回。
// timed:是否需要超時獲取
// nanos:超時時間單位納秒
privateintawaitDone(boolean timed,long nanos)
throwsInterruptedException{
finallong deadline = timed ?System.nanoTime()+ nanos :0L;
WaitNode q =null;
boolean queued =false;
// 此方法會一直for循環判斷任務狀態是否已經完成,是Future.get阻塞的原因
for(;;){
if(Thread.interrupted()){
removeWaiter(q);
thrownewInterruptedException();
}
int s = state;
// 任務狀態大于COMPLETING,則表明任務結束,直接返回
if(s > COMPLETING){
if(q !=null)
q.thread =null;
return s;
}
elseif(s == COMPLETING)// cannot time out yet
// Thread.yield() 方法,使當前線程由執行狀態,變成為就緒狀態,讓出cpu時間,在下一個線程執行時候,此線程有可能被執行,也有可能沒有被執行。
// COMPLETING狀態為瞬時狀態,任務執行完成,要么是正常結束,要么異常結束,后續會被置為NORMAL或者EXCEPTIONAL
Thread.yield();
elseif(q ==null)
// 每調用一次get方法,都會創建一個WaitNode等待節點
q =newWaitNode();
elseif(!queued)
// 將該等待節點添加到鏈表結構waiters中,q.next = waiters 即在waiters的頭部插入
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 如果方法帶有超時判斷,則判斷當前時間是否已經超過了截止時間,如果超過了及截止日期,則退出循環直接返回當前狀態,此時任務狀態一定是NEW
elseif(timed){
nanos = deadline -System.nanoTime();
if(nanos <=0L){
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
?我們在看下 report 方法,在調用 get 方法時是如何返回結果的。這里首先獲取 outcome 的值,并判斷任務是否已經執行完成,如果執行完成,則將 outcome 對象強轉成泛型指定的類型;如果任務被取消了,則拋出一個 CancellationException 異常;如果都不是,則說明任務在執行過程中發生了異常,此時任務狀態位 EXCEPTIONAL,此時的 outcome 即為 Throwable 對象,所以將 outcome 強轉為 Throwable 并拋出異常。
由此可以知道,我們將一個 FutureTask 任務 submit 到線程池中執行的時候,如果發生了異常,是會在調用 get 方法的時候拋出的。
privateVreport(int s)throwsExecutionException{ Object x = outcome; if(s == NORMAL) return(V)x; if(s >= CANCELLED) thrownewCancellationException(); thrownewExecutionException((Throwable)x); }
?2.5 cancel 方法介紹
cancel 方法用于取消正在運行的任務,如果任務取消成功,則返回 TRUE,如果取消失敗則返回 FALSE。
// mayInterruptIfRunning:允許中斷正在運行的任務
publicbooleancancel(boolean mayInterruptIfRunning){
// mayInterruptIfRunning如果為true則將狀態置為INTERRUPTING,如果未false則將狀態置為CANCELLED
if(!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
returnfalse;
// 如果狀態修改成功后,判斷是否允許中斷線程,如果允許,則調用Thread的interrupt方法中斷
try{// in case call to interrupt throws exception
if(mayInterruptIfRunning){
try{
Thread t = runner;
if(t !=null)
t.interrupt();
}finally{// final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
}finally{
// 取消后的收尾工作
finishCompletion();
}
returntrue;
}
?2.6 isDone/isCancelled 方法介紹
isDone 方法用于判斷 FutureTask 是否已經完成;isCancelled 方法用來判斷 FutureTask 是否已經取消,這兩個方法都是通過狀態位來判斷的。
publicbooleanisCancelled(){
return state >= CANCELLED;
}
publicbooleanisDone(){
return state != NEW;
}
?2.7 finishCompletion 方法介紹
我們看下 finishCompletion 方法都做了哪些工作。
// 刪除所有等待線程并發出信號,最后執行done方法 privatevoidfinishCompletion(){ // assert state > COMPLETING; for(WaitNode q;(q = waiters)!=null;){ if(UNSAFE.compareAndSwapObject(this, waitersOffset, q,null)){ for(;;){ Thread t = q.thread; if(t !=null){ q.thread =null; LockSupport.unpark(t); } WaitNode next = q.next; if(next ==null) break; q.next =null;// unlink to help gc q = next; } break; } } done(); callable =null;// to reduce footprint }?我們看到 done 方法是一個受保護的空方法,此處沒有任何邏輯,由其子類去根據自己的業務去實現相應的邏輯。例如:java.util.concurrent.ExecutorCompletionService.QueueingFuture。
protectedvoiddone(){}
3、總結
通過源碼解讀可以了解到 Future 的原理:
第一步:主線程將任務封裝成一個 Callable 對象,通過 submit 方法提交到線程池去執行。
第二步:線程池執行任務的 run 方法,主線程則可以繼續執行其他邏輯。
第三步:線程池中方法執行完成后將結果賦值到 outcome 屬性上,并修改任務狀態。
第四步:主線程在需要拿到異步任務結果的時候,主動調用 fugure.get () 方法來獲取結果。
第五步:如果異步線程在執行過程中發生異常,則會在調用 future.get () 方法的時候拋出來。 以上就是對于 FutureTask 的分析,我們可以了解 FutureTask 任務執行的方式以及 Future.get 已阻塞的方式獲取線程執行的結果原理,并且從代碼中可以了解 FutureTask 的任務執行狀態以及狀態的變化過程。
審核編輯:劉清
-
狀態機
+關注
關注
2文章
499瀏覽量
29145 -
線程池
+關注
關注
0文章
58瀏覽量
7390 -
for循環
+關注
關注
0文章
61瀏覽量
2885
原文標題:并發編程 - FutureTask 解析
文章出處:【微信號:OSC開源社區,微信公眾號:OSC開源社區】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
FutureTask是如何通過阻塞來獲取到異步線程執行結果的呢?
評論