本文轉自 Bennyhuo 的博客

原文地址:bennyhuo.com/2019/04/11

上一篇我們知道了協程啟動的幾種模式,也通過示例認識了 launch 啟動協程的使用方法,本文將延續這些內容從調度的角度來進一步為大家揭示協程的奧義。

1. 協程上下文

調度器本質上就是一個協程上下文的實現,我們先來介紹下上下文。

前面我們提到 launch 函數有三個參數,第一個參數叫 上下文,它的介面類型是 CoroutineContext,通常我們見到的上下文的類型是 CombinedContext 或者 EmptyCoroutineContext,一個表示上下文的組合,另一個表示什麼都沒有。我們來看下 CoroutineContext 的介面方法:

@SinceKotlin("1.3")
public interface CoroutineContext {
public operator fun <E : Element> get(key: Key<E>): E?
public fun <R> fold(initial: R, operation: (R, Element) -> R): R
public operator fun plus(context: CoroutineContext): CoroutineContext = ...
public fun minusKey(key: Key<*>): CoroutineContext

public interface Key<E : Element>

public interface Element : CoroutineContext {
public val key: Key<*>
...
}
}

不知道大家有沒有發現,它簡直就是一個以 Key 為索引的 List

表中的 List.plus(List) 實際上指的是擴展方法 Collection<T>.plus(elements: Iterable<T>): List<T>

CoroutineContext 作為一個集合,它的元素就是源碼中看到的 Element,每一個 Element 都有一個 key,因此它可以作為元素出現,同時它也是 CoroutineContext 的子介面,因此也可以作為集合出現。

講到這裡,大家就會明白,CoroutineContext 原來是個數據結構啊。如果大家對於 List 的遞歸定義比較熟悉的話,那麼對於 CombinedContextEmptyCoroutineContext 也就很容易理解了,例如 scala 的 List是這麼定義的:

sealed abstract class List[+A] extends ... {
...
def head: A
def tail: List[A]
...
}

在模式匹配的時候,List(1,2,3,4) 是可以匹配 x::y 的,x 就是 1,y 則是 List(2,3,4)

CombinedContext 的定義也非常類似:

internal class CombinedContext(
private val left: CoroutineContext,
private val element: Element
) : CoroutineContext, Serializable {
...
}

只不過它是反過來的,前面是集合,後面是單獨的一個元素。我們在協程體裡面訪問到的 coroutineContext 大多是這個 CombinedContext 類型,表示有很多具體的上下文實現的集合,我們如果想要找到某一個特別的上下文實現,就需要用對應的 Key 來查找,例如:

suspend fun main(){
GlobalScope.launch {
println(coroutineContext[Job]) // "coroutine#1":StandaloneCoroutine{Active}@1ff62014
}
println(coroutineContext[Job]) // null,suspend main 雖然也是協程體,但它是更底層的邏輯,因此沒有 Job 實例
}

這裡的 Job 實際上是對它的 companion object 的引用

public interface Job : CoroutineContext.Element {
/**
* Key for [Job] instance in the coroutine context.
*/
public companion object Key : CoroutineContext.Key<Job> { ... }
...
}

所以我們也可以仿照 Thread.currentThread() 來一個獲取當前 Job 的方法:

```kotlin suspend inline fun Job.Key.currentJob() = coroutineContext[Job] suspend fun coroutineJob(){ GlobalScope.launch { log(Job.currentJob()) } log(Job.currentJob()) } ```

我們可以通過指定上下文為協程添加一些特性,一個很好的例子就是為協程添加名稱,方便調試:

GlobalScope.launch(CoroutineName("Hello")) {
...
}

如果有多個上下文需要添加,直接用 + 就可以了:

GlobalScope.launch(Dispatchers.Main + CoroutineName("Hello")) {
...
}

Dispatchers.Main 是調度器的一個實現,不用擔心,我們很快就會認識它了。

2. 協程攔截器

費了好大勁兒說完上下文,這裡就要說一個比較特殊的存在了——攔截器。

public interface ContinuationInterceptor : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<ContinuationInterceptor>

public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
...
}

攔截器也是一個上下文的實現方向,攔截器可以左右你的協程的執行,同時為了保證它的功能的正確性,協程上下文集合永遠將它放在最後面,這真可謂是天選之子了。

它攔截協程的方法也很簡單,因為協程的本質就是回調 + 「黑魔法」,而這個回調就是被攔截的 Continuation 了。用過 OkHttp 的小夥伴一下就興奮了,攔截器我常用的啊,OkHttp 用攔截器做緩存,打日誌,還可以模擬請求,協程攔截器也是一樣的道理。調度器就是基於攔截器實現的,換句話說調度器就是攔截器的一種。

我們可以自己定義一個攔截器放到我們的協程上下文中,看看會發生什麼。

class MyContinuationInterceptor: ContinuationInterceptor{
override val key = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>) = MyContinuation(continuation)
}

class MyContinuation<T>(val continuation: Continuation<T>): Continuation<T> {
override val context = continuation.context
override fun resumeWith(result: Result<T>) {
log("<MyContinuation> $result" )
continuation.resumeWith(result)
}
}

我們只是在回調處打了一行日誌。接下來我們把用例拿出來:

suspend fun main() {
GlobalScope.launch(MyContinuationInterceptor()) {
log(1)
val job = async {
log(2)
delay(1000)
log(3)
"Hello"
}
log(4)
val result = job.await()
log("5. $result")
}.join()
log(6)
}

這可能是迄今而止我們給出的最複雜的例子了,不過請大家不要被它嚇到,它依然很簡單。我們通過 launch 啟動了一個協程,為它指定了我們自己的攔截器作為上下文,緊接著在其中用 async 啟動了一個協程,asynclaunch 從功能上是同等類型的函數,它們都被稱作協程的 Builder 函數,不同之處在於 async 啟動的 Job 也就是實際上的 Deferred 可以有返回結果,可以通過 await 方法獲取。

可想而知,result 的值就是 Hello。那麼這段程序運行的結果如何呢?

15:31:55:989 [main] <MyContinuation> Success(kotlin.Unit) // ①
15:31:55:992 [main] 1
15:31:56:000 [main] <MyContinuation> Success(kotlin.Unit) // ②
15:31:56:000 [main] 2
15:31:56:031 [main] 4
15:31:57:029 [kotlinx.coroutines.DefaultExecutor] <MyContinuation> Success(kotlin.Unit) // ③
15:31:57:029 [kotlinx.coroutines.DefaultExecutor] 3
15:31:57:031 [kotlinx.coroutines.DefaultExecutor] <MyContinuation> Success(Hello) // ④
15:31:57:031 [kotlinx.coroutines.DefaultExecutor] 5. Hello
15:31:57:031 [kotlinx.coroutines.DefaultExecutor] 6

「// ①」 不是程序輸出的內容,僅為後續講解方便而做的標註。

大家可能就要奇怪了,你不是說 Continuation 是回調麼,這裡面回調調用也就一次啊(await 那裡),怎麼日誌列印了四次呢?

別慌,我們按順序給大家介紹。

首先,所有協程啟動的時候,都會有一次 Continuation.resumeWith 的操作,這一次操作對於調度器來說就是一次調度的機會,我們的協程有機會調度到其他線程的關鍵之處就在於此。 ①、② 兩處都是這種情況。

其次,delay 是掛起點,1000ms 之後需要繼續調度執行該協程,因此就有了 ③ 處的日誌。

最後,④ 處的日誌就很容易理解了,正是我們的返回結果。

可能有朋友還會有疑問,我並沒有在攔截器當中切換線程,為什麼從 ③ 處開始有了線程切換的操作?這個切換線程的邏輯源自於 delay,在 JVM 上 delay 實際上是在一個 ScheduledExcecutor 裡面添加了一個延時任務,因此會發生線程切換;而在 JavaScript 環境中則是基於 setTimeout,如果運行在 Nodejs 上,delay 就不會切線程了,畢竟人家是單線程的。

如果我們在攔截器當中自己處理了線程切換,那麼就實現了自己的一個簡單的調度器,大家有興趣可以自己去嘗試。

思考:攔截器可以有多個嗎?

3. 調度器

3.1 概述

有了前面的基礎,我們對於調度器的介紹就變得水到渠成了。

public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
...
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
...
}

它本身是協程上下文的子類,同時實現了攔截器的介面, dispatch 方法會在攔截器的方法 interceptContinuation 中調用,進而實現協程的調度。所以如果我們想要實現自己的調度器,繼承這個類就可以了,不過通常我們都用現成的,它們定義在 Dispatchers 當中:

val Default: CoroutineDispatcher
val Main: MainCoroutineDispatcher
val Unconfined: CoroutineDispatcher

這個類的定義涉及到了 Kotlin MPP 的支持,因此你在 Jvm 版本當中還會看到 val IO: CoroutineDispatcher,在 js 和 native 當中就只有前面提到的這三個了(對 Jvm 好偏心吶)。

  • IO 僅在 Jvm 上有定義,它基於 Default 調度器背後的線程池,並實現了獨立的隊列和限制,因此協程調度器從 Default 切換到 IO 並不會觸發線程切換。
  • Main 主要用於 UI 相關程序,在 Jvm 上包括 Swing、JavaFx、Android,可將協程調度到各自的 UI 線程上。
  • Js 本身就是單線程的事件循環,與 Jvm 上的 UI 程序比較類似。

3.2 編寫 UI 相關程序

Kotlin 的用戶絕大多數都是 Android 開發者,大家對 UI 的開發需求還是比較大的。我們舉一個很常見的場景,點擊一個按鈕做點兒非同步的操作再回調刷新 UI:

getUserBtn.setOnClickListener {
getUser { user ->
handler.post {
userNameView.text = user.name
}
}
}

我們簡單得給出 getUser 函數的聲明:

typealias Callback = (User) -> Unit

fun getUser(callback: Callback){
...
}

由於 getUser 函數需要切到其他線程執行,因此回調通常也會在這個非 UI 的線程中調用,所以為了確保 UI 正確被刷新,我們需要用 handler.post 切換到 UI 線程。上面的寫法就是我們最古老的寫法了。

後來又有了 RxJava,那麼事情開始變得有趣了起來:

fun getUserObservable(): Observable<User> {
return Observable.create<User> { emitter ->
getUser {
emitter.onNext(it)
}
}
}

於是點擊按鈕的事件可以這麼寫:

getUserBtn.setOnClickListener {
getUserObservable()
.observeOn(AndroidSchedulers.mainThread())
.subscribe { user ->
userNameView.text = user.name
}
}

其實 RxJava 在線程切換上的表現是非常優秀的,也正是如此,很多人甚至用它只是為了切線程方便!

那麼我們現在把這段代碼過渡到協程的寫法:

suspend fun getUserCoroutine() = suspendCoroutine<User> {
continuation ->
getUser {
continuation.resume(it)
}
}

按鈕點擊時,我們可以:

getUserBtn.setOnClickListener {
GlobalScope.launch(Dispatchers.Main) {
userNameView.text = getUserCoroutine().name
}
}

大家也可以用 anko-coroutines 當中的 View.onClick 擴展,這樣我們就無需自己在這裡用 launch 啟動協程了。有關 Anko 對協程的支持,我們後面專門安排一篇文章介紹。

這裡又有大家沒見過的內容啦,suspendCoroutine 這個方法並不是幫我們啟動協程的,它運行在協程當中並且幫我們獲取到當前協程的 Continuation 實例,也就是拿到回調,方便後面我們調用它的 resume 或者 resumeWithException 來返回結果或者拋出異常。

如果你重複調用 resume 或者 resumeWithException 會收穫一枚 IllegalStateException,仔細想想這是為什麼。

對比前面的 RxJava 的做法,你會發現這段代碼其實很容易理解,你甚至會發現協程的使用場景與 RxJava 竟是如此的相似。這裡我們用到了 Dispatchers.Main 來確保 launch 啟動的協程在調度時始終調度到 UI 線程,那麼下面我們來看看 Dispatchers.Main 的具體實現。

在 Jvm 上,Main 的實現也比較有意思:

internal object MainDispatcherLoader {
@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()

private fun loadMainDispatcher(): MainCoroutineDispatcher {
return try {
val factories = MainDispatcherFactory::class.java.let { clz ->
ServiceLoader.load(clz, clz.classLoader).toList()
}
factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories)
?: MissingMainCoroutineDispatcher(null)
} catch (e: Throwable) {
MissingMainCoroutineDispatcher(e)
}
}
}

在 Android 當中,協程框架通過註冊 AndroidDispatcherFactory 使得 Main 最終被賦值為 HandlerDispatcher 的實例,有興趣的可以去看下 kotlinx-coroutines-android 的源碼實現。

注意前面對於 RxJava 和協程的實現,我們都沒有考慮異常和取消的問題。有關異常和取消的話題,我們會在後面的文章中詳細介紹。

3.3 綁定到任意線程的調度器

調度器的目的就是切線程,你不要想著我在 dispatch 的時候根據自己的心情來隨機調用,那你是在害你自己(不怕各位笑話,這樣的代碼我還真寫過,僅供娛樂)。那麼問題就簡單了,我們只要提供線程,調度器就應該很方便的創建出來:

suspend fun main() {
val myDispatcher= Executors.newSingleThreadExecutor{ r -> Thread(r, "MyThread") }.asCoroutineDispatcher()
GlobalScope.launch(myDispatcher) {
log(1)
}.join()
log(2)
}

輸出的信息就表明協程運行在我們自己的線程上。

16:10:57:130 [MyThread] 1
16:10:57:136 [MyThread] 2

不過請大家注意,由於這個線程池是我們自己創建的,因此我們需要在合適的時候關閉它,不然的話:

我們可以通過主動關閉線程池或者調用:

myDispatcher.close()

來結束它的生命週期,再次運行程序就會正常退出了。

當然有人會說你創建的線程池的線程不是 daemon 的,所以主線程結束時 Jvm 不會停止運行。說的沒錯,但該釋放的還是要及時釋放,如果你只是在程序的整個生命週期當中短暫的用了一下這個調度器,那麼一直不關閉它對應的線程池豈不是會有線程泄露嗎?這就很尷尬了。

Kotlin 協程設計者也特別害怕大家注意不到這一點,還特地廢棄了兩個 API 並且開了一個 issue 說我們要重做這套 API,這兩個可憐的傢伙是誰呢?

廢棄的兩個基於線程池創建調度器的 API

fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher
fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher

這二者可以很方便的創建綁定到特定線程的調度器,但過於簡潔的 API 似乎會讓人忘記它的風險。Kotlin 一向不愛做這種不清不楚的事兒,所以您呢,還是像我們這一節例子當中那樣自己去構造線程池吧,這樣好歹自己忘了關閉也怨不著別人(哈哈哈)。

其實在多個線程上運行協程,線程總是這樣切來切去其實並不會顯得很輕量級,例如下面的例子就是比較可怕的了:

Executors.newFixedThreadPool(10)
.asCoroutineDispatcher().use { dispatcher ->
GlobalScope.launch(dispatcher) {
log(1)
val job = async {
log(2)
delay(1000)
log(3)
"Hello"
}
log(4)
val result = job.await()
log("5. $result")
}.join()
log(6)
}

這裡面除了 delay 那裡有一次不可避免的線程切換外,其他幾處協程掛起點的繼續操作(Continuation.resume)都會切線程:

16:28:04:771 [pool-1-thread-1] 1
16:28:04:779 [pool-1-thread-1] 4
16:28:04:779 [pool-1-thread-2] 2
16:28:05:790 [pool-1-thread-3] 3
16:28:05:793 [pool-1-thread-4] 5. Hello
16:28:05:794 [pool-1-thread-4] 6

如果我們的線程池只開 1 個線程,那麼這裡所有的輸出都將在這唯一的線程中列印:

16:40:14:685 [pool-1-thread-1] 1
16:40:14:706 [pool-1-thread-1] 4
16:40:14:710 [pool-1-thread-1] 2
16:40:15:723 [pool-1-thread-1] 3
16:40:15:725 [pool-1-thread-1] 5. Hello
16:40:15:725 [pool-1-thread-1] 6

對比這二者,10個線程的情況線程切換次數最少 3次,而 1 個線程的情況則只要 delay 1000ms 之後恢復執行的時候那一次。只是多兩次線程切換,到底會有多大影響呢?我在我自己的 2015 款 mbp 上對於兩種不同的情況分別循環運行 100 次,得到的平均時間如下:

注意,為了測試的公平性,在運行 100 次循環之前已經做好了預熱,確保所有類都已經載入。測試結果僅供參考。

也就是說多兩次線程切換平均能多出 1ms 的耗時。生產環境當中的代碼當然會更複雜,如果這樣用線程池去調度,結果可想而知。

實際上通常我們只需要在一個線程當中處理自己的業務邏輯,只有一些耗時的 IO 才需要切換到 IO 線程中處理,所以好的做法可以參考 UI 對應的調度器,自己通過線程池定義調度器的做法本身沒什麼問題,但最好只用一個線程,因為多線程除了前面說的線程切換的開銷外,還有線程安全的問題。

3.4 線程安全問題

Js 和 Native 的並發模型與 Jvm 不同,Jvm 暴露了線程 API 給用戶,這也使得協程的調度可以由用戶更靈活的選擇。越多的自由,意味著越多的代價,我們在 Jvm 上面編寫協程代碼時需要明白一點的是,線程安全問題在調度器不同的協程之間仍然存在。

好的做法,就像我們前面一節提到的,盡量把自己的邏輯控制在一個線程之內,這樣一方面節省了線程切換的開銷,另一方面還可以避免線程安全問題,兩全其美。

如果大家在協程代碼中使用鎖之類的並發工具就反而增加了代碼的複雜度,對此我的建議是大家在編寫協程代碼時盡量避免對外部作用域的可變變數進行引用,盡量使用參數傳遞而非對全局變數進行引用。

以下是一個錯誤的例子,大家很容易就能想明白:

suspend fun main(){
var i = 0
Executors.newFixedThreadPool(10)
.asCoroutineDispatcher().use { dispatcher ->
List(1000000) {
GlobalScope.launch(dispatcher) {
i++
}
}.forEach {
it.join()
}
}
log(i)
}

輸出的結果:

16:59:28:080 [main] 999593

4. suspend main 函數如何調度?

上一篇文章我們提到了 suspend main 會啟動一個協程,我們示例中的協程都是它的子協程,可是這個最外層的協程到底是怎麼來的呢?

我們先給出一個例子:

suspend fun main() {
log(1)
GlobalScope.launch {
log(2)
}.join()
log(3)
}

它等價於下面的寫法:

fun main() {
runSuspend {
log(1)
GlobalScope.launch {
log(2)
}.join()
log(3)
}
}

那你說這個 runSuspend 又是何妨神聖?它是 Kotlin 標準庫的一個方法,注意它不是 kotlinx.coroutines 當中的,它實際上屬於更底層的 API 了。

internal fun runSuspend(block: suspend () -> Unit) {
val run = RunSuspend()
block.startCoroutine(run)
run.await()
}

而這裡面的 RunSuspend 則是 Continuation 的實現:

private class RunSuspend : Continuation<Unit> {
override val context: CoroutineContext
get() = EmptyCoroutineContext

var result: Result<Unit>? = null

override fun resumeWith(result: Result<Unit>) = synchronized(this) {
this.result = result
(this as Object).notifyAll()
}

fun await() = synchronized(this) {
while (true) {
when (val result = this.result) {
null -> (this as Object).wait()
else -> {
result.getOrThrow() // throw up failure
return
}
}
}
}
}

它的上下文是空的,因此 suspend main 啟動的協程並不會有任何調度行為。

通過這個例子我們可以知道,實際上啟動一個協程只需要有一個 lambda 表達式就可以了,想當年 Kotlin 1.1 剛發布的時候,我寫了一系列的教程都是以標準庫 API 為基礎的,後來發現標準庫的 API 也許真的不是給我們用的,所以看看就好。

上述代碼在標準庫當中被修飾為 internal,因此我們無法直接使用它們。不過你可以把 RunSuspend.kt 當中的內容複製到你的工程當中,這樣你就可以直接使用啦,其中的 var result: Result<Unit>? = null 可能會報錯,沒關係,改成 private var result: Result<Unit>? = null 就可以了。

5. 小結

在這篇文章當中,我們介紹了協程上下文,介紹了攔截器,進而最終引出了我們的調度器,截止目前,我們還有異常處理、協程取消、Anko 對協程的支持等話題沒有講到,如果大家有協程相關想了解的話題,可以留言哈~


想要找到好 Offer、想要實現技術進階的迷茫中的 Android 工程師們,推薦大家關注下我的新課《破解Android高級面試》,這門課已經更新完畢,涉及內容均非淺嘗輒止,目前已經有300+同學在學習,你還在等什麼(*≧∪≦):

掃描二維碼或者點擊鏈接《破解Android高級面試》即可進入課程啦!


推薦閱讀:
相關文章