摘要: 原創出處 http://cmsblogs.com/?p=2269 「小明哥」歡迎轉載,保留摘要,謝謝!
  • 應用示例
  • 實現分析

此篇博客所有源碼均來自JDK 1.8

前面三篇博客分別介紹了CyclicBarrier、CountDownLatch、Semaphore,現在介紹併發工具類中的最後一個Exchange。Exchange是最簡單的也是最複雜的,簡單在於API非常簡單,就一個構造方法和兩個exchange()方法,最複雜在於它的實現是最複雜的(反正我是看暈了的)。

在API是這麼介紹的:可以在對中對元素進行配對和交換的線程的同步點。每個線程將條目上的某個方法呈現給 exchange 方法,與夥伴線程進行匹配,並且在返回時接收其夥伴的對象。Exchanger 可能被視爲 SynchronousQueue 的雙向形式。Exchanger 可能在應用程序(比如遺傳算法和管道設計)中很有用。

Exchanger,它允許在併發任務之間交換數據。具體來說,Exchanger類允許在兩個線程之間定義同步點。當兩個線程都到達同步點時,他們交換數據結構,因此第一個線程的數據結構進入到第二個線程中,第二個線程的數據結構進入到第一個線程中。

應用示例

Exchange實現較爲複雜,我們先看其怎麼使用,然後再來分析其源碼。現在我們用Exchange來模擬生產-消費者問題:

【死磕 Java 併發】—– J.U.C 之併發工具類:Exchanger

【死磕 Java 併發】—– J.U.C 之併發工具類:Exchanger

運行結果:

[2017022100011]

首先生產者Producer、消費者Consumer首先都創建一個緩衝列表,通過Exchanger來同步交換數據。消費中通過調用Exchanger與生產者進行同步來獲取數據,而生產者則通過for循環向緩存隊列存儲數據並使用exchanger對象消費者同步。到消費者從exchanger哪裏得到數據後,他的緩衝列表中有3個數據,而生產者得到的則是一個空的列表。上面的例子充分展示了消費者-生產者是如何利用Exchanger來完成數據交換的。

在Exchanger中,如果一個線程已經到達了exchanger節點時,對於它的夥伴節點的情況有三種:

  1. 如果它的夥伴節點在該線程到達之前已經調用了exchanger方法,則它會喚醒它的夥伴然後進行數據交換,得到各自數據返回。
  2. 如果它的夥伴節點還沒有到達交換點,則該線程將會被掛起,等待它的夥伴節點到達被喚醒,完成數據交換。
  3. 如果當前線程被中斷了則拋出異常,或者等待超時了,則拋出超時異常。

實現分析

Exchanger算法的核心是通過一個可交換數據的slot,以及一個可以帶有數據item的參與者。源碼中的描述如下:

 for (;;) {
if (slot is empty) { // offer
place item in a Node;
if (can CAS slot from empty to node) {
wait for release;
return matching item in node;
}
}
else if (can CAS slot from node to empty) { // release
get the item in node;
set matching item in node;
release waiting thread;
}
// else retry on CAS failure
}

Exchanger中定義瞭如下幾個重要的成員變量:

private final Participant participant;
private volatile Node[] arena;
private volatile Node slot;

participant的作用是爲每個線程保留唯一的一個Node節點。

slot爲單個槽,arena爲數組槽。他們都是Node類型。在這裏可能會感覺到疑惑,slot作爲Exchanger交換數據的場景,應該只需要一個就可以了啊?爲何還多了一個Participant 和數組類型的arena呢?一個slot交換場所原則上來說應該是可以的,但實際情況卻不是如此,多個參與者使用同一個交換場所時,會存在嚴重伸縮性問題。既然單個交換場所存在問題,那麼我們就安排多個,也就是數組arena。通過數組arena來安排不同的線程使用不同的slot來降低競爭問題,並且可以保證最終一定會成對交換數據。但是Exchanger不是一來就會生成arena數組來降低競爭,只有當產生競爭是纔會生成arena數組。那麼怎麼將Node與當前線程綁定呢?Participant ,Participant 的作用就是爲每個線程保留唯一的一個Node節點,它繼承ThreadLocal,同時在Node節點中記錄在arena中的下標index。

Node定義如下:

 @sun.misc.Contended static final class Node {
int index; // Arena index
int bound; // Last recorded value of Exchanger.bound
int collides; // Number of CAS failures at current bound
int hash; // Pseudo-random for spins
Object item; // This thread's current item
volatile Object match; // Item provided by releasing thread
volatile Thread parked; // Set to this thread when parked, else null
}
  • index:arena的下標;
  • bound:上一次記錄的Exchanger.bound;
  • collides:在當前bound下CAS失敗的次數;
  • hash:僞隨機數,用於自旋;
  • item:這個線程的當前項,也就是需要交換的數據;
  • match:做releasing操作的線程傳遞的項;
  • parked:掛起時設置線程值,其他情況下爲null;

在Node定義中有兩個變量值得思考:bound以及collides。前面提到了數組area是爲了避免競爭而產生的,如果系統不存在競爭問題,那麼完全沒有必要開闢一個高效的arena來徒增系統的複雜性。首先通過單個slot的exchanger來交換數據,當探測到競爭時將安排不同的位置的slot來保存線程Node,並且可以確保沒有slot會在同一個緩存行上。如何來判斷會有競爭呢?CAS替換slot失敗,如果失敗,則通過記錄衝突次數來擴展arena的尺寸,我們在記錄衝突的過程中會跟蹤“bound”的值,以及會重新計算衝突次數在bound的值被改變時。這裏闡述可能有點兒模糊,不着急,我們先有這個概念,後面在arenaExchange中再次做詳細闡述。

我們直接看exchange()方法

exchange(V x)

exchange(V x):等待另一個線程到達此交換點(除非當前線程被中斷),然後將給定的對象傳送給該線程,並接收該線程的對象。方法定義如下:

 public V exchange(V x) throws InterruptedException {
Object v;
Object item = (x == null) ? NULL_ITEM : x; // translate null args
if ((arena != null ||
(v = slotExchange(item, false, 0L)) == null) &&
((Thread.interrupted() || // disambiguates null return
(v = arenaExchange(item, false, 0L)) == null)))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
}

這個方法比較好理解:arena爲數組槽,如果爲null,則執行slotExchange()方法,否則判斷線程是否中斷,如果中斷值拋出InterruptedException異常,沒有中斷則執行arenaExchange()方法。整套邏輯就是:如果slotExchange(Object item, boolean timed, long ns)方法執行失敗了就執行arenaExchange(Object item, boolean timed, long ns)方法,最後返回結果V。

NULL_ITEM 爲一個空節點,其實就是一個Object對象而已,slotExchange()爲單個slot交換。

slotExchange(Object item, boolean timed, long ns)

【死磕 Java 併發】—– J.U.C 之併發工具類:Exchanger

【死磕 Java 併發】—– J.U.C 之併發工具類:Exchanger

程序首先通過participant獲取當前線程節點Node。檢測是否中斷,如果中斷return null,等待後續拋出InterruptedException異常。

如果slot不爲null,則進行slot消除,成功直接返回數據V,否則失敗,則創建arena消除數組。

如果slot爲null,但arena不爲null,則返回null,進入arenaExchange邏輯。

如果slot爲null,且arena也爲null,則嘗試佔領該slot,失敗重試,成功則跳出循環進入spin+block(自旋+阻塞)模式。

在自旋+阻塞模式中,首先取得結束時間和自旋次數。如果match(做releasing操作的線程傳遞的項)爲null,其首先嚐試spins+隨機次自旋(改自旋使用當前節點中的hash,並改變之)和退讓。當自旋數爲0後,假如slot發生了改變(slot != p)則重置自旋數並重試。否則假如:當前未中斷&arena爲null&(當前不是限時版本或者限時版本+當前時間未結束):阻塞或者限時阻塞。假如:當前中斷或者arena不爲null或者當前爲限時版本+時間已經結束:不限時版本:置v爲null;限時版本:如果時間結束以及未中斷則TIMED_OUT;否則給出null(原因是探測到arena非空或者當前線程中斷)。

match不爲空時跳出循環。

整個slotExchange清晰明瞭。

arenaExchange(Object item, boolean timed, long ns)

【死磕 Java 併發】—– J.U.C 之併發工具類:Exchanger

【死磕 Java 併發】—– J.U.C 之併發工具類:Exchanger

首先通過participant取得當前節點Node,然後根據當前節點Node的index去取arena中相對應的節點node。前面提到過arena可以確保不同的slot在arena中是不會相沖突的,那麼是怎麼保證的呢?我們先看arena的創建:

arena = new Node[(FULL + 2) << ASHIFT];


這個arena到底有多大呢?我們先看FULL 和ASHIFT的定義:

static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
private static final int ASHIFT = 7;
private static final int NCPU = Runtime.getRuntime().availableProcessors();
private static final int MMASK = 0xff; // 255

假如我的機器NCPU = 8 ,則得到的是768大小的arena數組。然後通過以下代碼取得在arena中的節點:

 Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);


他仍然是通過右移ASHIFT位來取得Node的,ABASE定義如下:

Class> ak = Node[].class;
ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);


U.arrayBaseOffset獲取對象頭長度,數組元素的大小可以通過unsafe.arrayIndexScale(T[].class) 方法獲取到。這也就是說要訪問類型爲T的第N個元素的話,你的偏移量offset應該是arrayOffset+N*arrayScale。也就是說BASE = arrayOffset+ 128 。其次我們再看Node節點的定義

 @sun.misc.Contended static final class Node{
....
}

在Java 8 中我們是可以利用sun.misc.Contended來規避僞共享的。所以說通過 << ASHIFT方式加上sun.misc.Contended,所以使得任意兩個可用Node不會再同一個緩存行中。

關於僞共享請參考如下博文:

僞共享(False Sharing)

[ Java8中用sun.misc.Contended避免僞共享(false sharing)](

http://blog.csdn.net/aigoogle/article/details/41518369)

我們再次回到arenaExchange()。取得arena中的node節點後,如果定位的節點q 不爲空,且CAS操作成功,則交換數據,返回交換的數據,喚醒等待的線程。

如果q等於null且下標在bound & MMASK範圍之內,則嘗試佔領該位置,如果成功,則採用自旋 + 阻塞的方式進行等待交換數據。

如果下標不在bound & MMASK範圍之內獲取由於q不爲null但是競爭失敗的時候:消除p。加入bound 不等於當前節點的bond(b != p.bound),則更新p.bound = b,collides = 0 ,i = m或者m - 1。如果衝突的次數不到m 獲取m 已經爲最大值或者修改當前bound的值失敗,則通過增加一次collides以及循環遞減下標i的值;否則更新當前bound的值成功:我們令i爲m+1即爲此時最大的下標。最後更新當前index的值。

Exchanger使用、原理都比較好理解,但是這個源碼看起來真心有點兒複雜,是真心難看懂,但是這種交換的思路Doug Lea在後續博文中還會提到,例如SynchronousQueue、LinkedTransferQueue。

最後用一個在網上看到的段子結束此篇博客(http://brokendreams.iteye.com/blog/2253956),博主對其做了一點點修改,以便更加符合在1.8環境下的Exchanger:

其實就是"我"和"你"(可能有多個"我",多個"你")在一個叫Slot的地方做交易(一手交錢,一手交貨),過程分以下步驟:

  1. 我先到一個叫做Slot的交易場所交易,發現你已經到了,那我就嘗試喊你交易,如果你迴應了我,決定和我交易那麼進入第2步;如果別人搶先一步把你喊走了,那我就進入第5步。
  2. 我拿出錢交給你,你可能會接收我的錢,然後把貨給我,交易結束;也可能嫌我掏錢太慢(超時)或者接個電話(中斷),TM的不賣了,走了,那我只能再找別人買貨了(從頭開始)。
  3. 我到交易地點的時候,你不在,那我先嚐試把這個交易點給佔了(一屁股做凳子上…),如果我成功搶佔了單間(交易點),那就坐這兒等着你拿貨來交易,進入第4步;如果被別人搶座了,那我只能在找別的地方兒了,進入第5步。
  4. 你拿着貨來了,喊我交易,然後完成交易;也可能我等了好長時間你都沒來,我不等了,繼續找別人交易去,走的時候我看了一眼,一共沒多少人,弄了這麼多單間(交易地點Slot),太TM浪費了,我喊來交易地點管理員:一共也沒幾個人,搞這麼多單間兒幹毛,給哥撤一個!。然後再找別人買貨(從頭開始);或者我老大給我打了個電話,不讓我買貨了(中斷)。
  5. 我跑去喊管理員,尼瑪,就一個坑交易個毛啊,然後管理在一個更加開闊的地方開闢了好多個單間,然後我就挨個來看每個單間是否有人。如果有人我就問他是否可以交易,如果迴應了我,那我就進入第2步。如果我沒有人,那我就佔着這個單間等其他人來交易,進入第4步。
  6. 如果我嘗試了幾次都沒有成功,我就會認爲,是不是我TM選的這個單間風水不好?不行,得換個地兒繼續(從頭開始);如果我嘗試了多次發現還沒有成功,怒了,把管理員喊來:給哥再開一個單間(Slot),加一個凳子,這麼多人就這麼幾個破凳子夠誰用!

擴展閱讀

J.U.C系列-線程安全的理論講解

「死磕 Java 併發」JUC 之 Java 併發容器:ConcurrentLinkedQueue「死磕 Java 併發」J.U.C 之 Java併發容器:ConcurrentHashMap

「死磕 Java 併發」J.U.C 之 ConcurrentHashMap 紅黑樹轉換分析

Java高併發秒殺API之高併發優化

相關文章