關鍵詞:Kotlin 協程 序列 Sequence

說出來你可能不信,Kotlin 1.1 協程還在喫奶的時候,Sequence 就已經正式推出了,然而,Sequence 生成器的實現居然有協程的功勞。

1. 認識 Sequence

在 Kotlin 當中,Sequence 這個概念確切的說是「懶序列」,產生懶序列的方式可以有多種,下面我們介紹一種由基於協程實現的序列生成器。需要注意的是,這個功能內置於 Kotlin 標準庫當中,不需要額外添加依賴。

下面我們給出一個斐波那契數列生成的例子:

val fibonacci = sequence {
yield(1L) // first Fibonacci number
var cur = 1L
var next = 1L
while (true) {
yield(next) // next Fibonacci number
val tmp = cur + next
cur = next
next = tmp
}
}

fibonacci.take(5).forEach(::log)

這個 sequence 實際上也是啟動了一個協程,yield 則是一個掛起點,每次調用時先將參數保存起來作為生成的序列迭代器的下一個值,之後返回 COROUTINE_SUSPENDED,這樣協程就不再繼續執行,而是等待下一次 resume 或者 resumeWithException 的調用,而實際上,這下一次的調用就在生成的序列的迭代器的 next() 調用時執行。如此一來,外部在遍歷序列時,每次需要讀取新值時,協程內部就會執行到下一次 yield 調用。

程序運行輸出的結果如下:

10:44:34:071 [main] 1
10:44:34:071 [main] 1
10:44:34:071 [main] 2
10:44:34:071 [main] 3
10:44:34:071 [main] 5

除了使用 yield(T) 生成序列的下一個元素以外,我們還可以用 yieldAll() 來生成多個元素:

val seq = sequence {
log("yield 1,2,3")
yieldAll(listOf(1, 2, 3))
log("yield 4,5,6")
yieldAll(listOf(4, 5, 6))
log("yield 7,8,9")
yieldAll(listOf(7, 8, 9))
}

seq.take(5).forEach(::log)

從運行結果我們可以看到,在讀取 4 的時候才會去執行到 yieldAll(listOf(4, 5, 6)),而由於 7 以後都沒有被訪問到,yieldAll(listOf(7, 8, 9)) 並不會被執行,這就是所謂的「懶」。

10:44:34:029 [main] yield 1,2,3
10:44:34:060 [main] 1
10:44:34:060 [main] 2
10:44:34:060 [main] 3
10:44:34:061 [main] yield 4,5,6
10:44:34:061 [main] 4
10:44:34:066 [main] 5

2. 深入序列生成器

前面我們已經不止一次提到 COROUTINE_SUSPENDED 了,我們也很容易就知道 yieldyieldAll 都是 suspend 函數,既然能做到」懶「,那麼必然在 yieldyieldAll 處是掛起的,因此它們的返回值一定是 COROUTINE_SUSPENDED,這一點我們在本文的開頭就已經提到,下面我們來見識一下廬山真面目:

override suspend fun yield(value: T) {
nextValue = value
state = State_Ready
return suspendCoroutineUninterceptedOrReturn { c ->
nextStep = c
COROUTINE_SUSPENDED
}
}

這是 yield 的實現,我們看到了老朋友 suspendCoroutineUninterceptedOrReturn,還看到了 COROUTINE_SUSPENDED,那麼掛起的問題就很好理解了。而 yieldAll 是如出一轍:

override suspend fun yieldAll(iterator: Iterator<T>) {
if (!iterator.hasNext()) return
nextIterator = iterator
state = State_ManyReady
return suspendCoroutineUninterceptedOrReturn { c ->
nextStep = c
COROUTINE_SUSPENDED
}
}

唯一的不同在於 state 的值,一個流轉到了 State_Ready,一個是 State_ManyReady,也倒是很好理解嘛。

那麼現在就剩下一個問題了,既然有了掛起,那麼什麼時候執行 resume ?這個很容易想到,我們在迭代序列的時候唄,也就是序列迭代器的 next() 的時候,那麼這事兒就好辦了,找下序列的迭代器實現即可,這個類型我們也很容易找到,顯然 yield 就是它的方法,我們來看看 next 方法的實現:

override fun next(): T {
when (state) {
State_NotReady, State_ManyNotReady -> return nextNotReady() // ①
State_ManyReady -> { // ②
state = State_ManyNotReady
return nextIterator!!.next()
}
State_Ready -> { // ③
state = State_NotReady
val result = nextValue as T
nextValue = null
return result
}
else -> throw exceptionalState()
}
}

我們來依次看下這三個條件:

  • ① 是下一個元素還沒有準備好的情況,調用 nextNotReady 會首先調用 hasNext 檢查是否有下一個元素,檢查的過程其實就是調用 Continuation.resume,如果有元素,就會再次調用 next,否則就拋異常
  • ② 表示我們調用了 yieldAll,一下子傳入了很多元素,目前還沒有讀取完,因此需要繼續從傳入的這個元素集合當中去迭代
  • ③ 表示我們調用了一次 yield,而這個元素的值就存在 nextValue 當中

hasNext 的實現也不是很複雜:

override fun hasNext(): Boolean {
while (true) {
when (state) {
State_NotReady -> {} // ①
State_ManyNotReady -> // ②
if (nextIterator!!.hasNext()) {
state = State_ManyReady
return true
} else {
nextIterator = null
}
State_Done -> return false // ③
State_Ready, State_ManyReady -> return true // ④
else -> throw exceptionalState()
}
state = State_Failed
val step = nextStep!!
nextStep = null
step.resume(Unit)
}
}

我們在通過 next 讀取完一個元素之後,如果已經傳入的元素已經沒有剩餘,狀態會轉為 State_NotReady,下一次取元素的時候就會在 next 中觸發到 hasNext 的調用,① 處什麼都沒有幹,因此會直接落到後面的 step.resume(),這樣就會繼續執行我們序列生成器的代碼,直到遇到 yield 或者 yieldAll

3. 小結

序列生成器很好的利用了協程的狀態機特性,將序列生成的過程從形式上整合到了一起,讓程序更加緊湊,表現力更強。本節討論的序列,某種意義上更像是生產 - 消費者模型中的生產者,而迭代序列的一方則像是消費者,其實在 kotlinx.coroutines 庫中提供了更為強大的能力來實現生產 - 消費者模式,我們將在後面的文章當中展示給大家看。

協程的回調特性可以讓我們在實踐當中很好的替代傳統回調的寫法,同時它的狀態機特性也可以讓曾經的狀態機實現獲得新的寫法,除了序列之外,也許還會有更多有趣的適用場景等待我們去發掘~


歡迎關注 Kotlin 中文社區!

中文官網:https://www.kotlincn.net/

中文官方博客:Kotlin Blog - Kotlin 中文博客

公眾號:Kotlin

知乎專欄:Kotlin

CSDN:Kotlin中文社區

掘金:Kotlin中文社區

簡書:Kotlin中文社區


推薦閱讀:
相關文章