Future模式:

是一種經典的空間換取事件的概念,也就是一種非同步的執行(需要開啟一個新的線程)。Future表示一個任務的生命週期,並提供了方法來判斷是否已經完成或取消,以及獲取任務的結果和取消任務的介面

public interface Future<V> {
/**
* 提供了中斷/取消線程的方法
* mayInterruptIfRunning表示是否允許取消正在執行卻沒有執行完畢的任務,如果設置true,則表示可以取消正在執行過程中的任務。
* 當線程已經執行完成是 無論mayInterruptIfRunning 為true或false,結果都是false,即如果取消已經完成的任務會返回false;
* 任務正在執行,若mayInterruptIfRunning設置為true,則返回true,若mayInterruptIfRunning設置為false,則返回false;
* 如果任務還沒有執行,則無論mayInterruptIfRunning為true還是false,肯定返回tr
*/
boolean cancel(boolean mayInterruptIfRunning);
/*
* 用於檢驗線程是否被中斷,如果是在完成前中斷的就返回true
* 在線程還沒有開始的時候調用cancel 之後查詢中斷結果也是true
*/
boolean isCancelled();
/**
* 查詢線程是否完成,只要線程停止就返回true
*
*/
boolean isDone();
/**
*用來獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回;
*/
V get() throws InterruptedException, ExecutionException;
/**
*獲取執行結果,如果在指定時間內,還沒獲取到結果,就直接返回null
*/
V get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException;
}

總結來說:Future介面提供了中斷任務,查詢任務狀態和獲取執行結果

FutureTask

Future介面和實現Future介面的FutureTask類,代表非同步計算的結果。同時FutureTask也實現了Runable介面,所以FutureTask可以交給Executor執行

public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>

  • FutureTask中屬性介紹:

state表示任務的狀態,Runner表示工作時的線程,

/*此任務的運行狀態,最初為NEW。運行狀態僅在set、setException和cancel方法中轉換為終端狀態。在完成過程中,
狀態可能具有COMPLETING(在設置結果時)或INTERRUPTING(僅在中斷運行程序以滿足取消(true)時)的瞬態值。
從這些中間狀態到最終狀態的轉換使用更便宜的有序/延遲寫,因為值是惟一的,不能進一步修改
可能的狀態轉換:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/**具體run運行時會調用其方法call(),並獲得結果,結果時置為null.*/
private Callable<V> callable;
/**
*The result to return or exception to throw from get()
*non-volatile, protected by state reads/writes
*/
private Object outcome;
/*具體的worker thread.*/
private volatile Thread runner;
/* Treiber stack 並發stack數據結構,用於存放阻塞在該futuretask#get方法的線程。*/
private volatile WaitNode waiters;

  • FutureTask方法介紹

FutureTask除了實現了Future的介面以外還實現了Runnable的run()方法。

1.先說一下FutureTask的構造方法:

public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
//將Runnbale分裝成一個Callable類,其返回結果就為result
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

FutureTask最後會將需要運行的線程轉為Callable類型。

2.run方法

a.最佳的方式應該是從ExecutorService中的submit方法說起,這裡的submit實際上調用的是

public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
//創建一個FutureTask對象
RunnableFuture<Void> ftask = newTaskFor(task, null);
//執行FutureTask的Run方法 (如果不明白下次會對線程次的excute方法專門寫一篇)
execute(ftask);
return ftask;
}

b.run()

public void run() {
// 先判斷state狀態,如果不是NEW說明執行完畢,直接return掉。
//後面使用CAS操作,判斷這個任務是否已經執行,這裡FutureTask有個全局的volatile runner欄位,
//這裡通過cas將當前線程指定給runner。
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
//如果執行成功將結果賦值result
//狀態修改狀態,設置最終狀態
set(result);
}
} finally {
// runner必須是非null,直到狀態被設置為
//防止並發調用run()
runner = null;
//必須在為空轉輪後重新讀取狀態,以防止發生這種情況中斷泄露
int s = state;
// 當狀態大於或等於INTERRUPTING,調用handlePossibleCancellationInterrupt方法,
// 等待別的線程將狀態設置成INTERRUPTED
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

解釋:

UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread())

相當於

if (this.runner == null ){
this.runner = Thread.currentThread();
}

設置最終結果狀態

protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
// 調用finishCompletion喚醒所有等待結果的線程
finishCompletion();
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// 調用finishCompletion喚醒所有等待結果的線程
finishCompletion();
}
}

執行流程

1.判斷線程是不是沒有執行,並且將線程複製給runner,

2.如果Callble對象不為空,且線程沒有執行過,那麼調用call()方法獲取同步獲取執行結果

如果出現異常,調用setException 如果成功調用set將結果插入。喚起等待的線程3.如果是中斷的,那麼將FutureTask中其他的等待線程狀態改為中斷

注意:WaitNode中存放的線程都是調用當前FutureTask的線程,所以喚起的也是調用當前FutureTask對象方法的線程

3.get方法

get方法分為兩種一個是限時阻塞,一個是一直阻塞等到獲取結果

public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
//如果還沒有開始等待完成
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}

awaitDone:等待線程結束

private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
//判斷線程是否已經中斷
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
//線程已經完成
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
//如果線程只在執行那麼讓出線程
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
//如果其他線程也來執行get方法將waitNode的next指向新的
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
//如果是限時的判斷是不是超過時間了,超過直接返回
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
//喚醒park池中等待結果的線程
LockSupport.parkNanos(this, nanos);
}else
//喚醒park池中等待結果的線程
LockSupport.park(this);
}
}

UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);
====>
if(waiter == (q.next = waiters))){
waiters = q;
}

report 最後返回值封裝

private V report(int s) throws ExecutionException {
Object x = outcome;
//正常執行 返回結果
if (s == NORMAL)
return (V)x;
//如果是取消的,那麼返回中斷錯誤
if (s >= CANCELLED)
throw new CancellationException();
//其他認為是執行錯誤
throw new ExecutionException((Throwable)x);
}

流程說明:

1. 判斷當前線程是否結束 根據state的狀態來判斷 <= COMPLETING 2. 如果線程沒有結束 調用awaitDone方法,如果設置了Timeout那麼就會限制阻塞等待的時間,2.1 判斷線程是否中斷已經中斷2.2 判斷線程是否已經完成,如果結束將FutureTask中內置對象waitNode的線程清空(防止重複執行),返回完成狀態2.3 如果線程執行中那麼讓出時間片段2.4 如果需要執行的線程沒有在waitNode對象,那麼創建2.5 如果其他線程也來執行get方法將waitNode的next指向新的2.6 如果有設置timeout,如果超過超時時間,那麼久直接返回狀態3.0 根據返回的狀態,去判斷如果是normal那麼久返回執行結果,其他狀態報錯

注意:步驟二是一個阻塞的過程,通過輪訓的方式不斷去詢問FutureTask狀態

4.其他方法

檢驗的方法都是通過判斷state的狀態,來區分是否操作成功

public boolean isCancelled() {
return state >= CANCELLED;
}

public boolean isDone() {
return state != NEW;
}
//中斷任務就是通過 Thread中的interrupt
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
/** 線程的thread.interrupt()方法是中斷線程,將會設置該線程的中斷狀態位,
即設置為true,中斷的結果線程是死亡、還是等待新的任務或是繼續運行至下一步,就取決於這個程序本身
。線程會不時地檢測這個中斷標示位,以判斷線程是否應該被中斷(中斷標示值是否為true)。
它並不像stop方法那樣會中斷一個正在運行的線程。**/
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}

推薦閱讀:

相關文章