Java 線程線程池初探

作者:疼蛋之丸

來源:https://www.jianshu.com/p/5d5198b434a2

一、線程池初探

所謂線程池,就是將多個線程放在一個池子裏面(所謂池化技術),然後需要線程的時候不是創建一個線程,而是從線程池裏面獲取一個可用的線程,然後執行我們的任務。線程池的關鍵在於它爲我們管理了多個線程,我們不需要關心如何創建線程,我們只需要關係我們的核心業務,然後需要線程來執行任務的時候從線程池中獲取線程。任務執行完之後線程不會被銷燬,而是會被重新放到池子裏面,等待機會去執行任務。

我們爲什麼需要線程池呢?首先一點是線程池爲我們提高了一種簡易的多線程編程方案,我們不需要投入太多的精力去管理多個線程,線程池會自動幫我們管理好,它知道什麼時候該做什麼事情,我們只要在需要的時候去獲取就可以了。其次,我們使用線程池很大程度上歸咎於創建和銷燬線程的代價是非常昂貴的,甚至我們創建和銷燬線程的資源要比我們實際執行的任務所花費的時間還要長,這顯然是不科學也是不合理的,而且如果沒有一個合理的管理者,可能會出現創建了過多的線程的情況,也就是在JVM中存活的線程過多,而存活着的線程也是需要銷燬資源的,另外一點,過多的線程可能會造成線程過度切換的尷尬境地。

對線程池有了一個初步的認識之後,我們來看看如何使用線程池。

  1. 創建一個線程池


ExecutorService executorService = Executors.newFixedThreadPool(1);


  1. 提交任務


executorService.submit(() -> System.out.println("run"));
Future stringFuture = executorService.submit(() -> "run");
  1. 創建一個調度線程池


ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);


  1. 提交一個週期性執行的任務


scheduledExecutorService
.scheduleAtFixedRate(() -> System.out.println("schedule"), 0, 1, TimeUnit.SECONDS);
  1. shutdown


 executorService.shutdownNow();
scheduledExecutorService.shutdownNow();

可以發現使用線程池非常簡單,只需要極少的代碼就可以創建出我們需要的線程池,然後將我們的任務提交到線程池中去。我們只需要在結束之時記得關閉線程池就可以了。本文的重點並非在於如何使用線程池,而是試圖剖析線程池的實現,比如一個調度線程池是怎麼實現的?是靠什麼實現的?爲什麼能這樣實現等等問題。

二、Java線程池實現架構

Java中與線程池相關的類有下面一些:

  • Executor
  • ExecutorService
  • ScheduledExecutorService
  • ThreadPoolExecutor
  • ScheduledThreadPoolExecutor
  • Executors

通過上面一節中的使用示例,可以發現Executors類是一個創建線程池的有用的類,事實上,Executors類的角色也就是創建線程池,它是一個工廠類,可以產生不同類型的線程池,而Executor是線程池的鼻祖類,它有兩個子類是ExecutorService和ScheduledExecutorService,而ThreadPoolExecutor和ScheduledThreadPoolExecutor則是真正的線程池,我們的任務將被這兩個類交由其所管理者的線程池運行,可以發現,ScheduledThreadPoolExecutor是一個集大成者類,下面我們可以看看它的類關係圖:


Java 線程線程池初探


ScheduledThreadPoolExecutor的類關係圖

ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,ThreadPoolExecutor實現了一般的線程池,沒有調度功能,而ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor的實現,然後增加了調度功能。

最爲原始的Executor只有一個方法execute,它接受一個Runnable類型的參數,意思是使用線程池來執行這個Runnable,可以發現Executor不提供有返回值的任務。ExecutorService繼承了Executor,並且極大的增強了Executor的功能,不僅支持有返回值的任務執行,而且還有很多十分有用的方法來爲你提供服務,下面展示了ExecutorService提供的方法:


Java 線程線程池初探


ExecutorService提供的方法

ScheduledExecutorService繼承了ExecutorService,並且增加了特有的調度(schedule)功能。關於Executor、ExecutorService和ScheduledExecutorService的關係,可以見下圖:


Java 線程線程池初探


Executor、ExecutorService和ScheduledExecutorService的關係

總結一下,經過我們的調研,可以發現其實對於我們編寫多線程代碼來說,最爲核心的是Executors類,根據我們是需要ExecutorService類型的線程池還是ScheduledExecutorService類型的線程池調用相應的工廠方法就可以了,而ExecutorService的實現表現在ThreadPoolExecutor上,ScheduledExecutorService的實現則表現在ScheduledThreadPoolExecutor上,下文將分別剖析這兩者,嘗試弄清楚線程池的原理。

三、ThreadPoolExecutor解析

上文中描述了Java中線程池相關的架構,瞭解了這些內容其實我們就可以使用java的線程池爲我們工作了,使用其提供的線程池我們可以很方便的寫出高質量的多線程代碼,本節將分析ThreadPoolExecutor的實現,來探索線程池的運行原理。下面的圖片展示了ThreadPoolExecutor的類圖:


Java 線程線程池初探


ThreadPoolExecutor的類圖

下面是幾個比較關鍵的類成員:

 private final BlockingQueue workQueue; // 任務隊列,我們的任務會添加到該隊列裏面,線程將從該隊列獲取任務來執行
private final HashSet workers = new HashSet();//任務的執行值集合,來消費workQueue裏面的任務
private volatile ThreadFactory threadFactory;//線程工廠
private volatile RejectedExecutionHandler handler;//拒絕策略,默認會拋出異異常,還要其他幾種拒絕策略如下:
1、CallerRunsPolicy:在調用者線程裏面運行該任務
2、DiscardPolicy:丟棄任務
3、DiscardOldestPolicy:丟棄workQueue的頭部任務
private volatile int corePoolSize;//最下保活work數量
private volatile int maximumPoolSize;//work上限

我們嘗試執行submit方法,下面是執行的關鍵路徑,總結起來就是:如果Worker數量還沒達到上限則繼續創建,否則提交任務到workQueue,然後讓worker來調度運行任務。

step 1: 
Future> submit(Runnable task);
step 2:
public Future> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
step 3:
void execute(Runnable command);
step 4:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //提交我們的額任務到workQueue
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) //使用maximumPoolSize作爲邊界
reject(command); //還不行?拒絕提交的任務
}
step 5:
private boolean addWorker(Runnable firstTask, boolean core)
step 6:
w = new Worker(firstTask); //包裝任務
final Thread t = w.thread; //獲取線程(包含任務)
workers.add(w); // 任務被放到works中
t.start(); //執行任務

上面的流程是高度概括的,實際情況遠比這複雜得多,但是我們關心的是怎麼打通整個流程,所以這樣分析問題是沒有太大的問題的。觀察上面的流程,我們發現其實關鍵的地方在於Worker,如果弄明白它是如何工作的,那麼我們也就大概明白了線程池是怎麼工作的了。下面分析一下Worker類。

Java 線程線程池初探


worker類圖

上面的圖片展示了Worker的類關係圖,關鍵在於他實現了Runnable接口,所以問題的關鍵就在於run方法上。在這之前,我們來看一下Worker類裏面的關鍵成員:

 final Thread thread; 
Runnable firstTask; //我們提交的任務,可能被立刻執行,也可能被放到隊列裏面

thread是Worker的工作線程,上面的分析我們也發現了在addWorker中會獲取worker裏面的thread然後start,也就是這個線程的執行,而Worker實現了Runnable接口,所以在構造thread的時候Worker將自己傳遞給了構造函數,thread.start執行的其實就是Worker的run方法。下面是run方法的內容:

 public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

我們來分析一下runWorker這個方法,這就是整個線程池的核心。首先獲取到了我們剛提交的任務firstTask,然後會循環從workQueue裏面獲取任務來執行,獲取任務的方法如下:

 private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

其實核心也就一句:

 Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();

我們再回頭看一下execute,其實我們上面只走了一條邏輯,在execute的時候,我們的worker的數量還沒有到達我們設定的corePoolSize的時候,會走上面我們分析的邏輯,而如果達到了我們設定的閾值之後,execute中會嘗試去提交任務,如果提交成功了就結束,否則會拒絕任務的提交。我們上面還提到一個成員:maximumPoolSize,其實線程池的最大的Worker數量應該是maximumPoolSize,但是我們上面的分析是corePoolSize,這是因爲我們的private boolean addWorker(Runnable firstTask, boolean core)的參數core的值來控制的,core爲true則使用corePoolSize來設定邊界,否則使用maximumPoolSize來設定邊界。直觀的解釋一下,當線程池裏面的Worker數量還沒有到corePoolSize,那麼新添加的任務會伴隨着產生一個新的worker,如果Worker的數量達到了corePoolSize,那麼就將任務存放在阻塞隊列中等待Worker來獲取執行,如果沒有辦法再向阻塞隊列放任務了,那麼這個時候maximumPoolSize就變得有用了,新的任務將會伴隨着產生一個新的Worker,如果線程池裏面的Worker已經達到了maximumPoolSize,那麼接下來提交的任務只能被拒絕策略拒絕了。可以參考下面的描述來理解:

 * When a new task is submitted in method {@link #execute(Runnable)},
* and fewer than corePoolSize threads are running, a new thread is
* created to handle the request, even if other worker threads are
* idle. If there are more than corePoolSize but less than
* maximumPoolSize threads running, a new thread will be created only
* if the queue is full. By setting corePoolSize and maximumPoolSize
* the same, you create a fixed-size thread pool. By setting
* maximumPoolSize to an essentially unbounded value such as {@code
* Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
* number of concurrent tasks. Most typically, core and maximum pool
* sizes are set only upon construction, but they may also be changed
* dynamically using {@link #setCorePoolSize} and {@link
* #setMaximumPoolSize}.

在此需要說明一點,有一個重要的成員:keepAliveTime,當線程池裏面的線程數量超過corePoolSize了,那麼超出的線程將會在空閒keepAliveTime之後被terminated。可以參考下面的文檔:

 * If the pool currently has more than corePoolSize threads,
* excess threads will be terminated if they have been idle for more
* than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).
相关文章