在上一篇文章《從0到1實現自己的阻塞隊列(上)》中,我們已經實現了一個可以使用的阻塞隊列版本。在這篇文章中,我們可以繼續我們的冒險之旅,將我們的阻塞隊列提升到接近JDK版本的水平上。

更進一步優化效率

我們一直使用的都是Object.notifyAll()或者condition.signalAll()這樣會喚醒所有線程的方法,那麼如果只有一個線程能夠順利執行,但是其他線程都要再次回到等待狀態繼續休眠,那不是非常的浪費嗎?比如如果有N個消費者線程在等待隊列中出現元素,那麼當一個元素被插入以後所有N個消費者線程都被全部喚醒,最後也只會有一個消費者線程能夠真正拿到元素並執行完成,其他線程不是都被白白喚醒了嗎?我們為什麼不用只會喚醒一個線程的Object.notify()condition.signal()方法呢?

拆分條件變數

在阻塞隊列中,我們可以使用Object.notify()或者condition.signal()這樣只喚醒一個線程的方法,但是會有一些前提條件:

  1. 首先,在一個條件變數上等待的線程必須是同一類型的。比如一個條件變數上只有消費者線程在等待,另一個條件變數上只有生產者線程在等待。這麼做的目的就是防止發生我們在插入時想喚醒的是消費者線程,但是喚醒了一個生產者線程,這個生產者線程又因為隊列已滿又進入了等待狀態,這樣我們需要喚醒的消費者線程就永遠不會被喚醒了。
  2. 另外還有一點就是這個條件變數上等待的線程只能互斥執行,如果N個生產者線程可以同時執行,我們也就不需要一個一個喚醒了,這樣反而會讓效率降低。當然,在我們的阻塞隊列當中,不管是插入還是彈出操作同一時間都只能有一個線程在執行,所以自然就滿足這個要求了。

所以,我們只需要滿足第一個要求讓不同類型的線程在不同的條件變數上等待就可以了。那麼具體要怎麼做呢?

首先,我們自然是要把原來的一個條件變數condition給拆分成兩個實例變數notFullnotEmpty,這兩個條件變數雖然對應於同一互斥鎖,但是兩個條件變數的等待和喚醒操作是完全隔離的。這兩個條件變數分別代表隊列未滿和隊列非空兩個條件,消費者線程因為是被隊列為空的情況所阻塞的,所以就應該等待隊列非空條件得到滿足;而生產者線程因為是被隊列已滿的情況所阻塞的,自然就要等待隊列未滿條件的成立。

/** 隊列未滿的條件變數 */
private final Condition notFull = lock.newCondition();

/** 隊列非空的條件變數 */
private final Condition notEmpty = lock.newCondition();

所以在put()take()方法中,我們就需要把take()方法中原來的condition.await()修改為等待隊列非空條件,即notEmpty.await();而put()方法中的condition.await()自然是要修改為等待隊列未滿條件成立,即notFull.await()。既然我們把等待條件變數的語句都改了,那麼喚醒的語句也要做同樣的修改,put()操作要喚醒等待的消費者線程,所以是notEmpty.signal()take()操作要喚醒的生產者線程,所以是notFull.signal()。修改完成後的代碼如下,大家可以參考一下:

/**
* 將指定元素插入隊列
*
* @param e 待插入的對象
*/
public void put(Object e) throws InterruptedException {
lock.lockInterruptibly();
try {
while (count == items.length) {
// 隊列已滿時進入休眠
// 等待隊列未滿條件得到滿足
notFull.await();
}

// 執行入隊操作,將對象e實際放入隊列中
enqueue(e);

// 插入元素後喚醒一個等待隊列非空條件成立的線程
notEmpty.signal();
} finally {
lock.unlock();
}
}

/**
* 從隊列中彈出一個元素
*
* @return 被彈出的元素
*/
public Object take() throws InterruptedException {
lock.lockInterruptibly();
try {
while (count == 0) {
// 隊列為空時進入休眠
// 等待隊列非空條件得到滿足
notEmpty.await();
}

// 執行出隊操作,將隊列中的第一個元素彈出
Object e = dequeue();

// 彈出元素後喚醒一個等待隊列未滿條件成立的線程
notFull.signal();

return e;
} finally {
lock.unlock();
}
}

驗證程序的效率

既然我們對阻塞隊列做了效率上的改進,那麼就讓我們來實際檢驗一下吧。我們還是之前已經提供的檢驗程序,但是不同的是,為了明顯地看出效率上的變化,我們需要修改一下程序中的兩個變數。首先,我們需要把檢驗程序中運行的線程數threads增加到400,然後我們需要把每個線程執行的次數改為100次,就像下面這樣:

// 創建400個線程
final int threads = 400;
// 每個線程執行100次
final int times = 100;

最後我們分別使用改進前和改進後的版本來執行這個這個阻塞隊列,在我的電腦上,改進前的版本耗時為7.80s,改進後的版本耗時為1.35s。看起來我們對阻塞隊列的效率做了一個非常大的提升,非常好,那我們還有沒有辦法再加快一點呢?

還能不能更快?

在上面的阻塞隊列實現中,我們主要使用的就是put()take()兩個操作。而因為有互斥鎖ReentrantLock的保護,所以這兩個方法在同一時間只能有一個線程調用。也就是說,生產者線程在操作隊列時同樣會阻塞消費者線程。不過從我們的代碼中看,實際上put()方法和take()方法之間需要有互斥鎖保護的共享數據訪問只發生在入隊操作enqueue方法和出隊操作dequeue方法之中。在這兩個方法裏,對於putIndextakeIndex的訪問是完全隔離的,enqueue只使用putIndex,而dequeue只使用takeIndex,那麼線程間的競爭性數據就只剩下count了。這樣的話,如果我們能解決count的更新問題是不是就可以把鎖lock拆分為兩個互斥鎖,分別讓生產者線程和消費者線程使用了呢?這樣的話生產者線程在操作時就只會阻塞生產者線程而不會阻塞消費者線程了,消費者線程也是一樣的道理。

拆分鎖

這時候就要請出我們很熟悉的一種同步工具CAS了,CAS是一個原子操作,它會接收兩個參數,一個是當前值,一個是目標值,如果當前值已經發生了改變,那麼就會返回失敗,而如果當前值沒有變化,就會將這個變數修改為目標值。在Java中,我們一般會通過java.util.concurrent中的AtomicInteger來執行CAS操作。在AtomicInteger類上有原子性的增加與減少方法,每次調用都可以保證對指定的對象進行增加或減少,並且即使有多個線程同時執行這些操作,它們的結果也仍然是正確的。

首先,為了保證入隊和出隊操作之間的互斥特性移除後兩個方法能夠並發執行,那麼我們就要保證對count的更新是線程安全的。因此,我們首先需要把實例變數count的類型從int修改為AtomicInteger,而AtomicInteger類就提供了我們需要的原子性的增加與減少介面。

/** 隊列中的元素總數 */
private AtomicInteger count = new AtomicInteger(0);

然後對應地,我們需要將入隊方法中的count++和出隊方法中的count--分別改為Atomic原子性的加1方法getAndIncrement與減1方法getAndDecrement

/**
* 入隊操作
*
* @param e 待插入的對象
*/
private void enqueue(Object e) {
// 將對象e放入putIndex指向的位置
items[putIndex] = e;

// putIndex向後移一位,如果已到末尾則返回隊列開頭(位置0)
if (++putIndex == items.length)
putIndex = 0;

// 增加元素總數
count.getAndIncrement();
}

/**
* 出隊操作
*
* @return 被彈出的元素
*/
private Object dequeue() {
// 取出takeIndex指向位置中的元素
// 並將該位置清空
Object e = items[takeIndex];
items[takeIndex] = null;

// takeIndex向後移一位,如果已到末尾則返回隊列開頭(位置0)
if (++takeIndex == items.length)
takeIndex = 0;

// 減少元素總數
count.getAndDecrement();

// 返回之前代碼中取出的元素e
return e;
}

到這裡,我們就已經解決了put()take()方法之間的數據競爭問題,兩個方法現在就可以分別用兩個鎖來控制了。雖然相同類型的線程仍然是互斥的,例如生產者和生產者之間同一時間只能有一個生產者線程在操作隊列。但是在生產者線程和消費者線程之間將不用再繼續互斥,一個生產者線程和一個消費者線程可以在同一時間操作同一阻塞隊列了。所以,我們在這裡可以將互斥鎖lock拆為兩個,分別保證生產者線程和消費者線程的互斥性,我們將它們命名為插入鎖putLock和彈出鎖takeLock。同時,原來的條件變數也要分別對應於不同的互斥鎖了,notFull要對應於putLock,因為插入元素的生產者線程需要等待隊列未滿條件,那麼notEmpyt自然就要對應於takeLock了。

/** 插入鎖 */
private final ReentrantLock putLock = new ReentrantLock();

/** 隊列未滿的條件變數 */
private final Condition notFull = putLock.newCondition();

/** 彈出鎖 */
private final ReentrantLock takeLock = new ReentrantLock();

/** 隊列非空的條件變數 */
private final Condition notEmpty = takeLock.newCondition();

最後我們要對put()take()方法中的signal()調用做出一些調整。因為在上文中提到的,在使用條件變數時一定要先持有條件變數所對應的互斥鎖,而在put()take()方法中,使用signal()方法喚醒的都是另一種類型的線程,例如生產者線程喚醒消費者,消費者線程喚醒生產者。這樣我們調用signal()方法的條件變數就和try語句中持有的鎖不一致了,所以我們必須將直接的xxx.signal()調用替換為一個私有方法調用。而在私有方法中,我們會先獲取與條件變數對應的鎖,然後再調用條件變數的signal()方法。比如在下面的signalNotEmpty()方法中,我們就要先獲取takeLock才能調用notEmpty.signal();而在signalNotFull()方法中,我們就要先獲取putLock才能調用notFull.signal()

/**
* 喚醒等待隊列非空條件的線程
*/
private void signalNotEmpty() {
// 為了喚醒等待隊列非空條件的線程,需要先獲取對應的takeLock
takeLock.lock();
try {
// 喚醒一個等待非空條件的線程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

/**
* 喚醒等待隊列未滿條件的線程
*/
private void signalNotFull() {
// 為了喚醒等待隊列未滿條件的線程,需要先獲取對應的putLock
putLock.lock();
try {
// 喚醒一個等待隊列未滿條件的線程
notFull.signal();
} finally {
putLock.unlock();
}
}

解決死鎖問題

但直接把notFull.signal()換成signalNotFull(),把notEmpty.signal()換成signalNotEmpty()還不夠,因為在我們的代碼中,原來的notFull.signal()notEmpty.signal()都是在持有鎖的try語句塊當中的。一旦我們做了調用私有方法的替換,那麼put()take()方法就會以相反的順序同時獲取putLocktakeLock兩個鎖。有一些讀者可能已經意識到這樣會產生死鎖問題了,那麼我們應該怎麼解決它呢?

最好的方法就是不要同時加兩個鎖,我們完全可以在釋放前一個之後再使用signal()方法來喚醒另一種類型的線程。就像下面的put()take()方法中所做的一樣,我們可以在執行完入隊操作之後就釋放插入鎖putLock,然後才運行signalNotEmpty()方法去獲取takeLock並調用與其對應的條件變數notEmptysignal()方法,在take()方法中也是一樣的道理。

/**
* 將指定元素插入隊列
*
* @param e 待插入的對象
*/
public void put(Object e) throws InterruptedException {
putLock.lockInterruptibly();
try {
while (count.get() == items.length) {
// 隊列已滿時進入休眠
// 等待隊列未滿條件得到滿足
notFull.await();
}

// 執行入隊操作,將對象e實際放入隊列中
enqueue(e);
} finally {
putLock.unlock();
}

// 喚醒等待隊列非空條件的線程
// 為了防止死鎖,不能在釋放putLock之前獲取takeLock
signalNotEmpty();
}

/**
* 從隊列中彈出一個元素
*
* @return 被彈出的元素
*/
public Object take() throws InterruptedException {
Object e;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
// 隊列為空時進入休眠
// 等待隊列非空條件得到滿足
notEmpty.await();
}

// 執行出隊操作,將隊列中的第一個元素彈出
e = dequeue();
} finally {
takeLock.unlock();
}

// 喚醒等待隊列未滿條件的線程
// 為了防止死鎖,不能在釋放takeLock之前獲取putLock
signalNotFull();

return e;
}

到了這裡我們就順利地把原來單一的一個lock鎖拆分為了插入鎖putLocktakeLock,這樣生產者線程和消費者線程就可以同時運行了。

最後的細節優化——優化喚醒其他線程的效率

啊?我們的阻塞隊列到了這裡還能再繼續優化嗎?其實我們做的優化已經足夠多了,基本上影響比較大的優化我們都做了,但是還有一些細節是可以最後完善一下的。比如說如果隊列並沒有為空或者已滿時,我們插入或者彈出了元素其實都是不需要喚醒任何線程的,多餘的喚醒操作需要先獲取ReentrantLock鎖才能調用對應的條件變數的signal()方法,而獲取鎖是一個成本比較大的操作。所以我們最好是能在隊列真的為空或者已滿以後,成功插入或彈出元素時,再去獲取鎖並喚醒等待的線程。

也就是說我們會將signalNotEmpty();修改為if (c == 0) signalNotEmpty();,而把signalNotFull();修改為if (c == items.length) signalNotFull();,也就是隻有在必要的時候纔去喚醒另一種類型的線程。但是這種修改又會引入另外一種問題,例如有N個消費者線程在等待隊列非空,這時有兩個生產者線程插入了兩個元素,但是這兩個插入操作是連續發生的,也就是說只有第一個生產者線程在插入元素之後調用了signalNotEmpty(),第二個線程看到隊列原本是非空的就不會調用喚醒方法。在這種情況下,實際就只有一個消費者線程被喚醒了,而實際上隊列中還有一個元素可供消費。那麼我們如何解決這個問題呢?

比較簡單的一種方法就是,生產者線程和消費者線程不止會喚醒另一種類型的線程,而且也會喚醒同類型的線程。比如在生產者線程中如果插入元素之後發現隊列還未滿,那麼就可以調用notFull.signal()方法來喚醒其他可能存在的等待狀態的生產者線程,對於消費者線程所使用的take()方法也是類似的處理方式。相對來說signal方法較低,而互斥鎖的lock方法成本較高,而且會影響到另一種類型線程的運行。所以通過這種方式儘可能地少調用signalNotEmpty()signalNotFull()方法會是一種還不錯的優化手段。

優化後的put()take()方法如下:

/**
* 將指定元素插入隊列
*
* @param e 待插入的對象
*/
public void put(Object e) throws InterruptedException {
int c = -1;
putLock.lockInterruptibly();
try {
while (count.get() == items.length) {
// 隊列已滿時進入休眠
// 等待隊列未滿條件得到滿足
notFull.await();
}

// 執行入隊操作,將對象e實際放入隊列中
enqueue(e);

// 增加元素總數
c = count.getAndIncrement();

// 如果在插入後隊列仍然沒滿,則喚醒其他等待插入的線程
if (c + 1 < items.length)
notFull.signal();

} finally {
putLock.unlock();
}

// 如果插入之前隊列為空,才喚醒等待彈出元素的線程
// 為了防止死鎖,不能在釋放putLock之前獲取takeLock
if (c == 0)
signalNotEmpty();
}

/**
* 從隊列中彈出一個元素
*
* @return 被彈出的元素
*/
public Object take() throws InterruptedException {
Object e;
int c = -1;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
// 隊列為空時進入休眠
// 等待隊列非空條件得到滿足
notEmpty.await();
}

// 執行出隊操作,將隊列中的第一個元素彈出
e = dequeue();

// 減少元素總數
c = count.getAndDecrement();

// 如果隊列在彈出一個元素後仍然非空,則喚醒其他等待隊列非空的線程
if (c - 1 > 0)
notEmpty.signal();
} finally {
takeLock.unlock();
}

// 只有在彈出之前隊列已滿的情況下才喚醒等待插入元素的線程
// 為了防止死鎖,不能在釋放takeLock之前獲取putLock
if (c == items.length)
signalNotFull();

return e;
}

成品出爐!

恭喜大家,經過一番漫長的探索,我們終於徹底完成了我們的阻塞隊列實現之旅。如果你能堅持到這裡,我相信你已經對多線程編程的實踐方法有了非常深刻的理解。最後讓我們來看一看我們最終完成的成品代碼——一個完整的阻塞隊列實現吧。

完整的阻塞隊列代碼

public class BlockingQueue {

/** 存放元素的數組 */
private final Object[] items;

/** 彈出元素的位置 */
private int takeIndex;

/** 插入元素的位置 */
private int putIndex;

/** 隊列中的元素總數 */
private AtomicInteger count = new AtomicInteger(0);

/** 插入鎖 */
private final ReentrantLock putLock = new ReentrantLock();

/** 隊列未滿的條件變數 */
private final Condition notFull = putLock.newCondition();

/** 彈出鎖 */
private final ReentrantLock takeLock = new ReentrantLock();

/** 隊列非空的條件變數 */
private final Condition notEmpty = takeLock.newCondition();

/**
* 指定隊列大小的構造器
*
* @param capacity 隊列大小
*/
public BlockingQueue(int capacity) {
if (capacity <= 0)
throw new IllegalArgumentException();
items = new Object[capacity];
}

/**
* 入隊操作
*
* @param e 待插入的對象
*/
private void enqueue(Object e) {
// 將對象e放入putIndex指向的位置
items[putIndex] = e;

// putIndex向後移一位,如果已到末尾則返回隊列開頭(位置0)
if (++putIndex == items.length)
putIndex = 0;
}

/**
* 出隊操作
*
* @return 被彈出的元素
*/
private Object dequeue() {
// 取出takeIndex指向位置中的元素
// 並將該位置清空
Object e = items[takeIndex];
items[takeIndex] = null;

// takeIndex向後移一位,如果已到末尾則返回隊列開頭(位置0)
if (++takeIndex == items.length)
takeIndex = 0;

// 返回之前代碼中取出的元素e
return e;
}

/**
* 將指定元素插入隊列
*
* @param e 待插入的對象
*/
public void put(Object e) throws InterruptedException {
int c = -1;
putLock.lockInterruptibly();
try {
while (count.get() == items.length) {
// 隊列已滿時進入休眠
// 等待隊列未滿條件得到滿足
notFull.await();
}

// 執行入隊操作,將對象e實際放入隊列中
enqueue(e);

// 增加元素總數
c = count.getAndIncrement();

// 如果在插入後隊列仍然沒滿,則喚醒其他等待插入的線程
if (c + 1 < items.length)
notFull.signal();

} finally {
putLock.unlock();
}

// 如果插入之前隊列為空,才喚醒等待彈出元素的線程
// 為了防止死鎖,不能在釋放putLock之前獲取takeLock
if (c == 0)
signalNotEmpty();
}

/**
* 喚醒等待隊列非空條件的線程
*/
private void signalNotEmpty() {
// 為了喚醒等待隊列非空條件的線程,需要先獲取對應的takeLock
takeLock.lock();
try {
// 喚醒一個等待非空條件的線程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

/**
* 從隊列中彈出一個元素
*
* @return 被彈出的元素
*/
public Object take() throws InterruptedException {
Object e;
int c = -1;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
// 隊列為空時進入休眠
// 等待隊列非空條件得到滿足
notEmpty.await();
}

// 執行出隊操作,將隊列中的第一個元素彈出
e = dequeue();

// 減少元素總數
c = count.getAndDecrement();

// 如果隊列在彈出一個元素後仍然非空,則喚醒其他等待隊列非空的線程
if (c - 1 > 0)
notEmpty.signal();
} finally {
takeLock.unlock();
}

// 只有在彈出之前隊列已滿的情況下才喚醒等待插入元素的線程
// 為了防止死鎖,不能在釋放takeLock之前獲取putLock
if (c == items.length)
signalNotFull();

return e;
}

/**
* 喚醒等待隊列未滿條件的線程
*/
private void signalNotFull() {
// 為了喚醒等待隊列未滿條件的線程,需要先獲取對應的putLock
putLock.lock();
try {
// 喚醒一個等待隊列未滿條件的線程
notFull.signal();
} finally {
putLock.unlock();
}
}

}

有興趣的讀者可以把我們完成的這個阻塞隊列類和JDK中的java.util.concurrent.LinkedBlockingQueue類做一個比較,相信大家可以發現這兩個類非常的相似,這足以說明我們費勁千辛萬苦實現的這個阻塞隊列類已經非常接近JDK中的阻塞隊列類的質量了。

總結

恭喜大家終於完整地讀完了這篇文章!在這篇文章中,我們從一個最簡單的阻塞隊列版本開始,一路解決了各種問題,最終得到了一個完整、高質量的阻塞隊列實現。我們一起來回憶一下我們解決的問題吧。從最簡單的阻塞隊列開始,我們首先用互斥鎖synchronized關鍵字解決了並發控制問題,保證了隊列在多線程訪問情況下的正確性。然後我們用條件變數Object.wati()Object.notifyAll()解決了休眠喚醒問題,使隊列的效率得到了飛躍性地提高。為了保障隊列的安全性,不讓外部代碼可以訪問到我們所使用的對象鎖和條件變數,所以我們使用了顯式鎖ReentrantLock,並通過鎖對象locknewCondition()方法創建了與其相對應的條件變數對象。最後,我們對隊列中的條件變數和互斥鎖分別做了拆分,使隊列的效率得到了進一步的提高。當然,最後我們還加上了一點對喚醒操作的有條件調用優化,使整個阻塞隊列的實現變得更加完善。


推薦閱讀:
相關文章