前言

原以為線程池還挺簡單的(平時常用,也分析過原理),這次是想自己動手寫一個線程池來更加深入的了解它;但在動手寫的過程中落地到細節時發現並沒想的那麼容易。結合源碼對比後確實不得不佩服 Doug Lea

我覺得大部分人直接去看 java.util.concurrent.ThreadPoolExecutor 的源碼時都是看一個大概,因為其中涉及到了許多細節處理,還有部分 AQS 的內容,所以想要理清楚具體細節並不是那麼容易。

與其挨個分析源碼不如自己實現一個簡版,當然簡版並不意味著功能缺失,需要保證核心邏輯一致。

所以也是本篇文章的目的:

自己動手寫一個五臟俱全的線程池,同時會了解到線程池的工作原理,以及如何在工作中合理的利用線程池。

再開始之前建議對線程池不是很熟悉的朋友看看這幾篇:

這裡我截取了部分內容,也許可以埋個伏筆(坑)。


具體請看這兩個鏈接。

  • 如何優雅的使用和理解線程池
  • 線程池中你不容錯過的一些細節

由於篇幅限制,本次可能會分為上下兩篇。

創建線程池

現在進入正題,新建了一個 CustomThreadPool 類,它的工作原理如下:

簡單來說就是往線程池裡邊丟任務,丟的任務會緩衝到隊列里;線程池裡存儲的其實就是一個個的 Thread ,他們會一直不停的從剛才緩衝的隊列里獲取任務執行。

流程還是挺簡單。

先來看看我們這個自創的線程池的效果如何吧:

初始化了一個核心為3、最大線程數為5、隊列大小為 4 的線程池。

先往其中丟了 10 個任務,由於阻塞隊列的大小為 4 ,最大線程數為 5 ,所以由於隊列里緩衝不了最終會創建 5 個線程(上限)。

過段時間沒有任務提交後(sleep)則會自動縮容到三個線程(保證不會小於核心線程數)。

構造函數

來看看具體是如何實現的。

下面則是這個線程池的構造函數:

會有以下幾個核心參數:

  • miniSize 最小線程數,等效於 ThreadPool 中的核心線程數。
  • maxSize 最大線程數。
  • keepAliveTime 線程保活時間。
  • workQueue 阻塞隊列。
  • notify 通知介面。

大致上都和 ThreadPool 中的參數相同,並且作用也是類似的。

需要注意的是其中初始化了一個 workers 成員變數:

1 /**
2 * 存放線程池
3 */
4 private volatile Set<Worker> workers;
5
6 public CustomThreadPool(int miniSize, int maxSize, long keepAliveTime,
7 TimeUnit unit, BlockingQueue<Runnable> workQueue, Notify notify) {
8
9 workers = new ConcurrentHashSet<>();
10 }

workers 是最終存放線程池中運行的線程,在 j.u.c 源碼中是一個 HashSet 所以對他所有的操作都是需要加鎖。

我這裡為了簡便起見就自己定義了一個線程安全的 Set 稱為 ConcurrentHashSet

其實原理也非常簡單,和 HashSet 類似也是藉助於 HashMap 來存放數據,利用其 key 不可重複的特性來實現 set ,只是這裡的 HashMap 是用並發安全的 ConcurrentHashMap 來實現的。

這樣就能保證對它的寫入、刪除都是線程安全的。

不過由於 ConcurrentHashMapsize() 函數並不準確,所以我這裡單獨利用了一個 AtomicInteger 來統計容器大小。

創建核心線程

往線程池中丟一個任務的時候其實要做的事情還蠻多的,最重要的事情莫過於創建線程存放到線程池中了。

當然我們不能無限制的創建線程,不然拿線程池來就沒任何意義了。於是 miniSize maxSize 這兩個參數就有了它的意義。

但這兩個參數再哪一步的時候才起到作用呢?這就是首先需要明確的。

從這個流程圖可以看出第一步是需要判斷是否大於核心線程數,如果沒有則創建。

結合代碼可以發現在執行任務的時候會判斷是否大於核心線程數,從而創建線程。

worker.startTask() 執行任務部分放到後面分析。

這裡的 miniSize 由於會在多線程場景下使用,所以也用 volatile 關鍵字來保證可見性。

隊列緩衝

結合上面的流程圖,第二步自然是要判斷隊列是否可以存放任務(是否已滿)。

優先會往隊列里存放。

上至封頂

一旦寫入失敗則會判斷當前線程池的大小是否大於最大線程數,如果沒有則繼續創建線程執行。

不然則執行會嘗試阻塞寫入隊列(j.u.c 會在這裡執行拒絕策略)

以上的步驟和剛才那張流程圖是一樣的,這樣大家是否有看出什麼坑嘛?

時刻小心

從上面流程圖的這兩步可以看出會直接創建新的線程

這個過程相對於中間直接寫入阻塞隊列的開銷是非常大的,主要有以下兩個原因:

  • 創建線程會加鎖,雖說最終用的是 ConcurrentHashMap 的寫入函數,但依然存在加鎖的可能。
  • 會創建新的線程,創建線程還需要調用操作系統的 API 開銷較大。

所以理想情況下我們應該避免這兩步,盡量讓丟入線程池中的任務進入阻塞隊列中。

執行任務

任務是添加進來了,那是如何執行的?

在創建任務的時候提到過 worker.startTask() 函數:

1 /**
2 * 添加任務,需要加鎖
3 * @param runnable 任務
4 */
5 private void addWorker(Runnable runnable) {
6 Worker worker = new Worker(runnable, true);
7 worker.startTask();
8 workers.add(worker);
9 }

也就是在創建線程執行任務的時候會創建 Worker 對象,利用它的 startTask() 方法來執行任務。

所以先來看看 Worker 對象是長啥樣的:

其實他本身也是一個線程,將接收到需要執行的任務存放到成員變數 task 處。

而其中最為關鍵的則是執行任務 worker.startTask() 這一步驟。

1 public void startTask() {
2 thread.start();
3 }

其實就是運行了 worker 線程自己,下面來看 run 方法。

  • 第一步是將創建線程時傳過來的任務執行(task.run),接著會一直不停的從隊列里獲取任務執行,直到獲取不到新任務了。
  • 任務執行完畢後將內置的計數器 -1 ,方便後面任務全部執行完畢進行通知。
  • worker 線程獲取不到任務後退出,需要將自己從線程池中釋放掉(workers.remove(this))。

從隊列里獲取任務

其實 getTask 也是非常關鍵的一個方法,它封裝了從隊列中獲取任務,同時對不需要保活的線程進行回收。

很明顯,核心作用就是從隊列里獲取任務;但有兩個地方需要注意:

  • 當線程數超過核心線程數時,在獲取任務的時候需要通過保活時間從隊列里獲取任務;一旦獲取不到任務則隊列肯定是空的,這樣返回 null 之後在上文的 run() 中就會退出這個線程;從而達到了回收線程的目的,也就是我們之前演示的效果

  • 這裡需要加鎖,加鎖的原因是這裡肯定會出現並發情況,不加鎖會導致 workers.size() &gt; miniSize 條件多次執行,從而導致線程被全部回收完畢。

關閉線程池

最後來談談線程關閉的事;

還是以剛才那段測試代碼為例,如果提交任務後我們沒有關閉線程,會發現即便是任務執行完畢後程序也不會退出。

從剛才的源碼里其實也很容易看出來,不退出的原因是 Worker 線程一定還會一直阻塞在 task = workQueue.take();處,即便是線程縮容了也不會小於核心線程數。

通過堆棧也能證明:

恰好剩下三個線程阻塞於此處。

而關閉線程通常又有以下兩種:

  • 立即關閉:執行關閉方法後不管現在線程池的運行狀況,直接一刀切全部停掉,這樣會導致任務丟失。
  • 不接受新的任務,同時等待現有任務執行完畢後退出線程池。

立即關閉

我們先來看第一種立即關閉

1 /**
2 * 立即關閉線程池,會造成任務丟失
3 */
4 public void shutDownNow() {
5 isShutDown.set(true);
6 tryClose(false);
7 }
8
9 /**
10 * 關閉線程池
11 *
12 * @param isTry true 嘗試關閉 --> 會等待所有任務執行完畢
13 * false 立即關閉線程池--> 任務有丟失的可能
14 */
15 private void tryClose(boolean isTry) {
16 if (!isTry) {
17 closeAllTask();
18 } else {
19 if (isShutDown.get() && totalTask.get() == 0) {
20 closeAllTask();
21 }
22 }
23
24 }
25
26 /**
27 * 關閉所有任務
28 */
29 private void closeAllTask() {
30 for (Worker worker : workers) {
31 //LOGGER.info("開始關閉");
32 worker.close();
33 }
34 }
35
36 public void close() {
37 thread.interrupt();
38 }

很容易看出,最終就是遍歷線程池裡所有的 worker 線程挨個執行他們的中斷函數。

我們來測試一下:

可以發現後面丟進去的三個任務其實是沒有被執行的。

完事後關閉

正常關閉則不一樣:

1 /**
2 * 任務執行完畢後關閉線程池
3 */
4 public void shutdown() {
5 isShutDown.set(true);
6 tryClose(true);
7 }

他會在這裡多了一個判斷,需要所有任務都執行完畢之後才會去中斷線程。

同時在線程需要回收時都會嘗試關閉線程:


來看看實際效果:

回收線程

上文或多或少提到了線程回收的事情,其實總結就是以下兩點:

  • 一旦執行了 shutdown/shutdownNow 方法都會將線程池的狀態置為關閉狀態,這樣只要 worker 線程嘗試從隊列里獲取任務時就會直接返回空,導致 worker 線程被回收。

  • 一旦線程池大小超過了核心線程數就會使用保活時間來從隊列里獲取任務,所以一旦獲取不到返回 null 時就會觸發回收。

但如果我們的隊列足夠大,導致線程數都不會超過核心線程數,這樣是不會觸發回收的。

比如這裡我將隊列大小調為 10 ,這樣任務就會累計在隊列里,不會創建五個 worker 線程。

所以一直都是 Thread-1~3 這三個線程在反覆調度任務。

總結

本次實現了線程池裡大部分核心功能,我相信只要看完並動手敲一遍一定會對線程池有不一樣的理解。

結合目前的內容來總結下:

  • 線程池、隊列大小要設計的合理,盡量的讓任務從隊列中獲取執行。
  • 慎用 shutdownNow() 方法關閉線程池,會導致任務丟失(除非業務允許)。
  • 如果任務多,線程執行時間短可以調大 keepalive 值,使得線程盡量不被回收從而可以復用線程。

同時下次會分享一些線程池的新特性,如:

  • 執行帶有返回值的線程。
  • 異常處理怎麼辦?
  • 所有任務執行完怎麼通知我?

本文所有源碼:

github.com/crossoverJie

你的點贊與分享是對我最大的支持


推薦閱讀:
相关文章