前文回顧

  • AQS源碼詳細解讀
  • ReentrantLock源碼詳細解讀

本文簡單介紹下各種阻塞隊列,然後詳細看下LinkedBlockingQueue的源碼。其實阻塞隊列的實現,在思想上有很大一部分是相通的。而且在線程池中也會用到各種BlockingQueue,FixedThreadPool使用的就是LinkedBlockingQueue。知道一個以後其他的理解起來也不難。LinkedBlockingQueue的實現依賴於入隊鎖(putLock)和出隊鎖(takeLock),實際上就是一個ReentrantLock實現的生產者-消費者模式。不瞭解的同學可以先學習一下~

文章導讀

  • 阻塞隊列簡述
  • LinkedBlockingQueue基本屬性
  • 入隊源碼,add(),offer(),put(),enqueue()
  • 出隊源碼,poll(),take(),dequeue()
  • 總結

一、簡單介紹各種阻塞隊列

阻塞隊列大致可以分為這幾種:ArrayBlockingQueue,LinkedBlockingQueue,ConcurrentLinkedQueue,DelayQueue,LinkedTransferQueue,SynchronusQueue。

ArrayBlockingQueue--數組實現的有界隊列

會自動阻塞,根據調用api不同,有不同特性,當隊列容量不足時,有阻塞能力。

boolean add(E e):在容量不足時,拋出異常。

void put(E e):在容量不足時,阻塞等待。boolean offer(E e):不阻塞,容量不足時返回false,當前新增數據操作放棄。boolean offer(E e, long timeout, TimeUnit unit):容量不足時,阻塞times時長(單位為timeunit),如果在阻塞時長內,有容量空閑,新增數據返回true。如果阻塞時長範圍內,無容量空閑,放棄新增數據,返回false。

LinkedBlockingQueue--鏈式隊列,隊列容量不足或為0時自動阻塞

void put(E e):自動阻塞,隊列容量滿後,自動阻塞。E take():自動阻塞,隊列容量為0後,自動阻塞。

ConcurrentLinkedQueue--基礎鏈表同步隊列

boolean offer(E e):入隊。E peek():查看queue中的首數據。

E poll():取出queue中的首數據。

DelayQueue--延時隊列

根據比較機制,實現自定義處理順序的隊列。常用於定時任務,如:定時關機。int compareTo(Delayed o):比較大小,自動升序。比較方法建議和getDelay方法配合完成。如果在DelayQueue是需要按時完成的計劃任務,必須配合getDelay方法完成。long getDelay(TimeUnit unit):獲取計劃時長的方法,根據參數TimeUnit來決定,如何返回結果值。

LinkedTransferQueue--轉移隊列

boolean add(E e):隊列會保存數據,不做阻塞等待。void transfer(E e):是TransferQueue的特有方法。必須有消費者(take()方法調用者)。如果沒有任意線程消費數據,transfer方法阻塞。一般用於處理及時消息。

SynchronousQueue--同步隊列,容量為0

是特殊的TransferQueue,必須先有消費線程等待,才能使用的隊列。

boolean add(E e):父類方法,無阻塞,若沒有消費線程阻塞等待數據,則拋出異常。put(E e):有阻塞,若沒有消費線程阻塞等待數據,則阻塞。

二、LinkedBlockingQueue的基本屬性

先來看看節點是怎麼定義的。

static class Node<E> {
//節點的value
E item;
//下一個節點
Node<E> next;
Node(E x) { item = x; }
}

節點很簡單,實現了一個單向鏈表,再看看其他重要成員變數。LinkedBlockingQueue記錄了隊列容量,節點數量,頭尾節點,出隊鎖,入隊鎖。便於後續的入隊和出隊操作。

//隊列容量
private final int capacity;

//當前隊列元素數量
private final AtomicInteger count = new AtomicInteger();

//頭節點,不存數據
transient Node<E> head;

//尾節點,便於入隊
private transient Node<E> last;

//出隊鎖,只有take,poll方法會持有
private final ReentrantLock takeLock = new ReentrantLock();

//出隊等待條件
private final Condition notEmpty = takeLock.newCondition();

//入隊鎖,只有put,offer會持有
private final ReentrantLock putLock = new ReentrantLock();

//入隊等待條件
private final Condition notFull = putLock.newCondition();

三、入隊源碼

關於入隊的方法有add,offer,put。一般不使用add方法,因為add失敗會拋異常。put會阻塞,而offer有兩個重載方法,下面這種可設置超時時間的支持中斷操作。其實真正的數據結構層面的入隊方法是void enqueue(Node<E> node),其他入隊方法都是對入隊前條件進行判斷,再次對這個方法封裝。

AbstractQueed.add():入隊時,我們可以調用父類的添加方法。一般不推薦,如果隊列滿了,就會拋異常。

public boolean add(E e) {
//使用了模板方法模式,offer()是子類實現的
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

offer(E e,long timeout, TimeUnit unit):在隊尾插入一個元素,超時則不嘗試,支持獲取鎖時中斷。當前節點入隊後,如果隊列沒滿,就喚醒一個入隊的線程讓其入隊。

public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

if (e == null) throw new NullPointerException();
//轉換為納秒
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//獲取入隊鎖,支持等待鎖的過程中被中斷
putLock.lockInterruptibly();
try {
//隊列滿了,再看看有沒有超時
while (count.get() == capacity) {
//等待時間超時
if (nanos <= 0)
return false;
//進行等待,awaitNanos(long nanos)是AQS中的方法,之前的文章中有寫過
//在等待過程中,如果被喚醒或超時,則繼續當前循環
//如果被中斷,則拋出中斷異常
nanos = notFull.awaitNanos(nanos);
}
//進入隊尾
enqueue(new Node<E>(e));
c = count.getAndIncrement();
//說明當前元素後面還能再插入一個
//就喚醒一個入隊條件隊列中阻塞的線程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//節點數量為0,說明隊列是空的
if (c == 0)
//喚醒一個出隊條件隊列阻塞的線程
signalNotEmpty();
return true;
}

put():在隊尾插元素,如果隊列滿了則一直阻塞,直到隊列不滿了或被中斷了。

public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
//規定給當前put方法預留一個本地變數
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//隊列滿了,進入阻塞狀態
while (count.get() == capacity) {
notFull.await();
}
//正常入隊
enqueue(node);
//count.getAndIncrement()返回修改之前的c,就是count通過CAS操作已經+1了
//但是返回的還是之前的節點數
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//當入隊前的節點數是0時,可能線程阻塞在take()方法
//喚醒一個
if (c == 0)
signalNotEmpty();
}

enqueue():節點入隊。此時入隊鎖必然被當前線程持有,而且隊尾元素的next必然為空。

private void enqueue(Node<E> node) {
//從右往左賦值
last = last.next = node;
}

瞭解一下offer與put的區別

offer方法有兩個重載方法,可以根據等待超時條件進行入隊,是非阻塞的;put時如果隊列滿了,則會阻塞,直到隊列為空且被喚醒或中斷。

四、出隊源碼

出隊方法隨之對應的有poll(),take()以及針對於具體節點出隊操作的dequeue()。poll()和take()都是對dequeue()的再次封裝。

poll():poll()有兩個重載方法,我們就看比較複雜的這個。如果有元素直接出隊,如果隊列為空。判斷是否超時,超時返回null,未超時則接著等待,直到隊列不為空或等待超時或被中斷。

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
//隊列為空且已經超時,直接返回空
if (nanos <= 0)
return null;
//等待過程中可能被喚醒,超時,中斷
nanos = notEmpty.awaitNanos(nanos);
}
//進行出隊操作
x = dequeue();
c = count.getAndDecrement();
//如果節點數1,說明可以喚醒一個被阻塞的出隊線程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//如果出隊前,隊列是滿的,則喚醒一個被take()阻塞的線程
if (c == capacity)
signalNotFull();
return x;
}

take():如果隊列為空,則阻塞,直到被中斷或不為空。

public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//獲取出隊鎖
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
//隊列為空,進入阻塞狀態
notEmpty.await();
}
//出隊操作
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

dequeue():節點出隊。執行前確保出隊鎖已經被當前線程持有且頭節點的item為空,因為頭節點是方便操作的。

private E dequeue() {
//不存放數據
Node<E> h = head;
Node<E> first = h.next;
h.next = h;
head = first;
E x = first.item;
first.item = null;
return x;
}

瞭解一下poll與take的區別

與offer和put區別類似,offer非阻塞,put會阻塞。

總結

整體順下來之後,阻塞隊列也不難。實際上就是ReentrantLock實現的生產者-消費者模式。

offer與poll是非阻塞的,put與take是阻塞的。

入隊使用入隊鎖,出隊使用出隊鎖。

單形參的offer與poll中調用的是lock()方法,阻塞獲取鎖資源。

如果喜歡我的文章,歡迎關注我的專欄~


推薦閱讀:
相關文章