來源:勞夫子
鏈接:http://www.cnblogs.com/liuyun1995/p/8474026.html
Java併發系列|Semaphore源碼分析


Semaphore(信號量)是JUC包中比較常用到的一個類,它是AQS共享模式的一個應用,可以允許多個線程同時對共享資源進行操作,並且可以有效的控制併發數,利用它可以很好的實現流量控制。

Semaphore提供了一個許可證的概念,可以把這個許可證看作公共汽車車票,只有成功獲取車票的人才能夠上車,並且車票是有一定數量的,不可能毫無限制的發下去,這樣就會導致公交車超載。所以當車票發完的時候(公交車以滿載),其他人就只能等下一趟車了。如果中途有人下車,那麼他的位置將會空閒出來,因此如果這時其他人想要上車的話就又可以獲得車票了。

利用Semaphore可以實現各種池,我們在本篇末尾將會動手寫一個簡易的數據庫連接池。首先我們來看一下Semaphore的構造器。

//構造器1
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
//構造器2
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Semaphore提供了兩個帶參構造器,沒有提供無參構造器。這兩個構造器都必須傳入一個初始的許可證數量,使用構造器1構造出來的信號量在獲取許可證時會採用非公平方式獲取,使用構造器2可以通過參數指定獲取許可證的方式(公平or非公平)。

Semaphore主要對外提供了兩類API,獲取許可證和釋放許可證,默認的是獲取和釋放一個許可證,也可以傳入參數來同時獲取和釋放多個許可證。在本篇中我們只講每次獲取和釋放一個許可證的情況。

1、獲取許可證

//獲取一個許可證(響應中斷)
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//獲取一個許可證(不響應中斷)
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
//嘗試獲取許可證(非公平獲取)
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
//嘗試獲取許可證(定時獲取)
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

上面的API是Semaphore提供的默認獲取許可證操作。每次只獲取一個許可證,這也是現實生活中較常遇到的情況。除了直接獲取還提供了嘗試獲取,直接獲取操作在失敗之後可能會阻塞線程,而嘗試獲取則不會。另外還需注意的是tryAcquire方法是使用非公平方式嘗試獲取的。在平時我們比較常用到的是acquire方法去獲取許可證。下面我們就來看看它是怎樣獲取的。可以看到acquire方法裏面直接就是調用sync.acquireSharedInterruptibly,這個方法是AQS裏面的方法,我們簡單講一下。

//以可中斷模式獲取鎖(共享模式)
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
//首先判斷線程是否中斷, 如果是則拋出異常
if (Thread.interrupted()) {
throw new InterruptedException();
}
//1.嘗試去獲取鎖
if (tryAcquireShared(arg) < 0) {
//2. 如果獲取失敗則進人該方法
doAcquireSharedInterruptibly(arg);
}
}

acquireSharedInterruptibly方法首先就是去調用tryAcquireShared方法去嘗試獲取,tryAcquireShared在AQS裏面是抽象方法,FairSync和NonfairSync這兩個派生類實現了該方法的邏輯。FairSync實現的是公平獲取的邏輯,而NonfairSync實現的非公平獲取的邏輯。

abstract static class Sync extends AbstractQueuedSynchronizer {
//非公平方式嘗試獲取
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//獲取可用許可證
int available = getState();
//獲取剩餘許可證
int remaining = available - acquires;
//1.如果remaining小於0則直接返回remaining
//2.如果remaining大於0則先更新同步狀態再返回remaining
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining;
}
}
}
}
//非公平同步器
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
//嘗試獲取許可證
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
//公平同步器
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
//嘗試獲取許可證
protected int tryAcquireShared(int acquires) {
for (;;) {
//判斷同步隊列前面有沒有人排隊
if (hasQueuedPredecessors()) {
//如果有的話就直接返回-1,表示嘗試獲取失敗
return -1;
}
//獲取可用許可證
int available = getState();
//獲取剩餘許可證
int remaining = available - acquires;
//1.如果remaining小於0則直接返回remaining
//2.如果remaining大於0則先更新同步狀態再返回remaining
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining;
}
}
}
}

這裏需要注意的是NonfairSync的tryAcquireShared方法直接調用的是nonfairTryAcquireShared方法,這個方法是在父類Sync裏面的。非公平獲取鎖的邏輯是先取出當前同步狀態(同步狀態表示許可證個數),將當前同步狀態減去參入的參數,如果結果不小於0的話證明還有可用的許可證,那麼就直接使用CAS操作更新同步狀態的值,最後不管結果是否小於0都會返回該結果值。

這裏我們要了解tryAcquireShared方法返回值的含義,返回負數表示獲取失敗,零表示當前線程獲取成功但後續線程不能再獲取,正數表示當前線程獲取成功並且後續線程也能夠獲取。我們再來看acquireSharedInterruptibly方法的代碼。

//以可中斷模式獲取鎖(共享模式)
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
//首先判斷線程是否中斷, 如果是則拋出異常
if (Thread.interrupted()) {
throw new InterruptedException();
}
//1.嘗試去獲取鎖
//負數:表示獲取失敗
//零值:表示當前線程獲取成功, 但是後繼線程不能再獲取了
//正數:表示當前線程獲取成功, 並且後繼線程同樣可以獲取成功
if (tryAcquireShared(arg) < 0) {
//2. 如果獲取失敗則進人該方法
doAcquireSharedInterruptibly(arg);
}
}

如果返回的remaining小於0的話就代表獲取失敗,因此tryAcquireShared(arg) < 0就爲true,所以接下來就會調用doAcquireSharedInterruptibly方法,這個方法我們在講AQS的時候講過,它會將當前線程包裝成結點放入同步隊列尾部,並且有可能掛起線程。這也是當remaining小於0時線程會排隊阻塞的原因。

而如果返回的remaining>=0的話就代表當前線程獲取成功,因此tryAcquireShared(arg) < 0就爲flase,所以就不會再去調用doAcquireSharedInterruptibly方法阻塞當前線程了。

以上是非公平獲取的整個邏輯,而公平獲取時僅僅是在此之前先去調用hasQueuedPredecessors方法判斷同步隊列是否有人在排隊,如果有的話就直接return -1表示獲取失敗,否則才繼續執行下面和非公平獲取一樣的步驟。

2、釋放許可證

//釋放一個許可證
public void release() {
sync.releaseShared(1);
}

調用release方法是釋放一個許可證,它的操作很簡單,就調用了AQS的releaseShared方法,我們來看看這個方法。

//釋放鎖的操作(共享模式)
public final boolean releaseShared(int arg) {
//1.嘗試去釋放鎖
if (tryReleaseShared(arg)) {
//2.如果釋放成功就喚醒其他線程
doReleaseShared();
return true;
}
return false;
}

AQS的releaseShared方法首先調用tryReleaseShared方法嘗試釋放鎖,這個方法的實現邏輯在子類Sync裏面。

abstract static class Sync extends AbstractQueuedSynchronizer {
...
//嘗試釋放操作
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//獲取當前同步狀態
int current = getState();
//將當前同步狀態加上傳入的參數
int next = current + releases;
//如果相加結果小於當前同步狀態的話就報錯
if (next < current) {
throw new Error("Maximum permit count exceeded");
}
//以CAS方式更新同步狀態的值, 更新成功則返回true, 否則繼續循環
if (compareAndSetState(current, next)) {
return true;
}
}
}
...
}

可以看到tryReleaseShared方法裏面採用for循環進行自旋,首先獲取同步狀態,將同步狀態加上傳入的參數,然後以CAS方式更新同步狀態,更新成功就返回true並跳出方法,否則就繼續循環直到成功爲止,這就是Semaphore釋放許可證的流程。

3、動手寫個連接池

Semaphore代碼並沒有很複雜,常用的操作就是獲取和釋放一個許可證,這些操作的實現邏輯也都比較簡單,但這並不妨礙Semaphore的廣泛應用。下面我們就來利用Semaphore實現一個簡單的數據庫連接池,通過這個例子希望讀者們能更加深入的掌握Semaphore的運用。

public class ConnectPool {
//連接池大小
private int size;
//數據庫連接集合
private Connect[] connects;
//連接狀態標誌
private boolean[] connectFlag;
//剩餘可用連接數
private volatile int available;
//信號量
private Semaphore semaphore;
//構造器
public ConnectPool(int size) {
this.size = size;
this.available = size;
semaphore = new Semaphore(size, true);
connects = new Connect[size];
connectFlag = new boolean[size];
initConnects();
}
//初始化連接
private void initConnects() {
//生成指定數量的數據庫連接
for(int i = 0; i < this.size; i++) {
connects[i] = new Connect();
}
}
//獲取數據庫連接
private synchronized Connect getConnect(){
for(int i = 0; i < connectFlag.length; i++) {
//遍歷集合找到未使用的連接
if(!connectFlag[i]) {
//將連接設置爲使用中
connectFlag[i] = true;
//可用連接數減1
available--;
System.out.println("【"+Thread.currentThread().getName()+"】以獲取連接 剩餘連接數:" + available);
//返回連接引用
return connects[i];
}
}
return null;
}
//獲取一個連接
public Connect openConnect() throws InterruptedException {
//獲取許可證
semaphore.acquire();
//獲取數據庫連接
return getConnect();
}
//釋放一個連接
public synchronized void release(Connect connect) {
for(int i = 0; i < this.size; i++) {
if(connect == connects[i]){
//將連接設置爲未使用
connectFlag[i] = false;
//可用連接數加1
available++;
System.out.println("【"+Thread.currentThread().getName()+"】以釋放連接 剩餘連接數:" + available);
//釋放許可證
semaphore.release();
}
}
}
//剩餘可用連接數
public int available() {
return available;
}
}

測試代碼:

public class TestThread extends Thread {
private static ConnectPool pool = new ConnectPool(3);
@Override
public void run() {
try {
Connect connect = pool.openConnect();
Thread.sleep(100); //休息一下
pool.release(connect);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
for(int i = 0; i < 10; i++) {
new TestThread().start();
}
}
}

測試結果:


Java併發系列|Semaphore源碼分析


我們使用一個數組來存放數據庫連接的引用,在初始化連接池的時候會調用initConnects方法創建指定數量的數據庫連接,並將它們的引用存放到數組中,此外還有一個相同大小的數組來記錄連接是否可用。

每當外部線程請求獲取一個連接時,首先調用semaphore.acquire()方法獲取一個許可證,然後將連接狀態設置爲使用中,最後返回該連接的引用。許可證的數量由構造時傳入的參數決定,每調用一次semaphore.acquire()方法許可證數量減1,當數量減爲0時說明已經沒有連接可以使用了,這時如果其他線程再來獲取就會被阻塞。

每當線程釋放一個連接的時候會調用semaphore.release()將許可證釋放,此時許可證的總量又會增加,代表可用的連接數增加了,那麼之前被阻塞的線程將會醒來繼續獲取連接,這時再次獲取就能夠成功獲取連接了。

測試示例中初始化了一個3個連接的連接池,我們從測試結果中可以看到,每當線程獲取一個連接剩餘的連接數將會減1,等到減爲0時其他線程就不能再獲取了,此時必須等待一個線程將連接釋放之後才能繼續獲取。可以看到剩餘連接數總是在0到3之間變動,說明我們這次的測試是成功的。

相關文章