本文的代碼來源於:snarky.ca/how-the-heck-

回顧一下

上一篇文章介紹了 Python async、await 關鍵字的發展歷史,說過,async和 await 是 API 而不是 implementation。基於 async、await實現的事件循環有很多,包括 asyncio、curio等。其中 asyncio 底層基於 future對象,curio 底層基於tuple。

這篇文章我們來用 最小堆實現一個簡單的事件循環。

heapq 模塊

Heaps are arrays for which a[k] <= a[2k+1] and a[k] <= a[2k+2] for all k, counting elements from 0. For the sake of comparison, non-existing elements are considered to be infinite. The interesting property of a heap is that a[0] is always its smallest element. (來源於 Python 內置模塊 heapq 源代碼)

簡單來說,heaps就是一種有特殊性質的 Python 列表: a[k]<=a[2*k+1]a[k]<=a[2*k+2],第一個元素永遠是最小的。

沒錯你肯定已經看出來了,這就是一顆二叉樹:

heapq模塊主要有下面這幾個 API:

Usage:

heap = [] # creates an empty heap
heappush(heap, item) # pushes a new item on the heap
item = heappop(heap) # pops the smallest item from the heap
item = heap[0] # smallest item on the heap without popping it
heapify(x) # transforms list into a heap, in-place, in linear time
item = heapreplace(heap, item) # pops and returns smallest item,and adds new item; the heap size is unchanged

  • 初始化堆:heap = []
  • 往堆中添加元素:heappush(heap,item)
  • 從堆中 pop 出最小的元素:item = heappop(heap)
  • 從堆中獲取最小元素但是不移除:item = heap[0]
  • 將隊列轉換成堆:heapify(x)
  • pop 最小元素並添加一個新的元素進去:item = heapreplace(heap, item)

生成器 send() 方法

再回顧一下,這個可能有點難理解。

next_value=generator.send(value)

會發生三件事:

  • 恢復生成器繼續執行
  • value 成為了生成器當前 yield 表達式的值
  • 生成器下一次 yield表達式的值,作為 next_value返回。

看下這個例子:

>>> def double_inputs():
... while True:
... x = yield
... yield x * 2
...
>>> gen = double_inputs()
>>> next(gen) # run up to the first yield
>>> gen.send(10) # goes into x variable
20
>>> next(gen) # run up to the next yield
>>> gen.send(6) # goes into x again
12
>>> next(gen) # run up to the next yield
>>> gen.send(94.3) # goes into x again
188.5999999999999

執行 gen.send(10)發生的事情如下:

  • 讓生成器恢復運行
  • 10賦予了 x=yieldx
  • x*2的值是 20,此時再次遇到 yield,函數再次暫停,並且把 x*2的值作為返回值,所以發現這個語句輸出了20.

next(g)等價於 g.send(None),這個經常用來讓生成器運行到 yield 的地方然後停下來。

事件循環功能設計

我們要實現的事件循環很簡單,核心功能如下:

  • 處理很多延時任務
  • 運行時間點最早的任務最先運行
  • 假如前面的任務需要很長時間才能完成,不會阻塞後面的任務(也就是他們可以並行執行)

代碼

Task 類

你可以把這個想做是 asyncio.Task/curio.Task

class Task:
def __init__(self, wait_until, coro):
self.coro = coro
self.waiting_until = wait_until

def __eq__(self, other):
return self.waiting_until == other.waiting_until

def __lt__(self, other):
return self.waiting_until < other.waiting_until

這裡定義了兩個特殊方法: __eq____lt__,用來對 Task進行 <==比較。因為我們這裡用的是 heapq最小堆,『最小』的排在最前面。Task 實例比較大小的依據是他們的 waiting_until下一次恢復運行的時間點)。

所以,在某一個時刻,最小堆的狀態可能是這樣的:

TaskA將在0秒後恢復運行,他的恢復運行時間(wait_until)『最小』,所以就會首先被彈出執行,然後 TaskB會取代他的位置成為『最小』的元素。

實際執行的任務

@types.coroutine
def sleep(seconds):
now = datetime.datetime.now()
wait_until = now + datetime.timedelta(seconds=seconds)
actual = yield wait_until
return actual - now

async def countdown(label, length, *, delay=0):
print(label, waiting, delay, seconds before starting countdown)
delta = await sleep(delay)
print(label, starting after waiting, delta)
while length:
print(label, T-minus, length)
waited = await sleep(1)
length -= 1
print(label, lift-off!)

delay秒之後運行一個耗時 length秒的任務。簡要分析一下代碼:

有一點需要明確, countdown()返回的是一個 coroutine對象,你如果不 await它(或者調用 next(), send()),什麼也不會真正執行。

delta=awaitsleep(delay)這一句,會加入coroutine sleep()裡面,在第一個 yield 的地方暫停。要想讓它恢復運行,需要通過某種方式"send stuff back"(參考上一篇文章),也就是對這個生成器調用 send()方法。 後面會看到,實際上這屬於事件循環的工作。

另外,對於每個任務,第一次恢復執行的時間應該是 delay秒,所以事件循環應該在程序開始 delay秒的時候調用 send()

後面的 while循環會再次進行運行、暫停的循環,直到時間超過了 length秒,也就是任務結束。

事件循環代碼

class SleepingLoop:
def __init__(self, *coros):
self._new = coros
self._waiting = []

def run_until_complete(self):
for coro in self._new:
wait_for = coro.send(None)
heapq.heappush(self._waiting, Task(wait_for, coro))
while self._waiting:
now = datetime.datetime.now()
task = heapq.heappop(self._waiting)
if now < task.waiting_until:
delta = task.waiting_until - now
time.sleep(delta.total_seconds())
now = datetime.datetime.now()
try:
# Its time to resume the coroutine.
wait_until = task.coro.send(now)
heapq.heappush(self._waiting, Task(wait_until, task.coro))
except StopIteration:
# The coroutine is done.
pass

def main():
"""Start the event loop, counting down 3 separate launches.

This is what a user would typically write.
"""
loop = SleepingLoop(
countdown(A, 5, delay=0),
countdown(B, 3, delay=2),
countdown(C, 4, delay=1)
)
start = datetime.datetime.now()
loop.run_until_complete()
print(Total elapsed time is, datetime.datetime.now() - start)

if __name__ == __main__:
main()

代碼一共就只有這麼點,是不是很簡單?來分析一下:

for coro in self._new:
wait_for = coro.send(None)
heapq.heappush(self._waiting, Task(wait_for, coro))

wait_for=coro.send(None) 是第一次對這些coroutine對象調用 send(),如前面所說,這一步會在 sleepactual=yieldwait_until這個地方停下來。 wait_until的值會傳給 wait_for,這是第一次開始任務開始運行的時間。然後把這些Task 對象添加到最小堆裡面。

接下來是一個 while循環,每個循環從最小堆中取出『最小』的元素,也就是下一次恢復運行時間最近的哪一個任務。如果發現現在還沒到它的恢復執行時間,就調用阻塞time.sleep()。(這裡可以阻塞,因為這個事件循環非常簡單,我們可以確定這段時間沒有新的任務需要恢復執行。)

接著對 coro調用 send()方法,如果還沒遇到 StopIteration,就把新的 Task 推到最小堆(前面從最小堆裡面取出任務,如果這個任務沒迭代完,就更新它的下次恢復執行時間,再次推到最小堆裡面)。

那麼什麼時候會發生 StopIteration異常呢?當 countdown()這個 coroutine 得 while 循環結束的時候,也就是沒有更多的 yield 的時候。

最終的代碼

import datetime
import heapq
import types
import time

class Task:
"""Represent how long a coroutine should wait before starting again.

Comparison operators are implemented for use by heapq. Two-item
tuples unfortunately dont work because when the datetime.datetime
instances are equal, comparison falls to the coroutine and they dont
implement comparison methods, triggering an exception.

Think of this as being like asyncio.Task/curio.Task.
"""

def __init__(self, wait_until, coro):
self.coro = coro
self.waiting_until = wait_until

def __eq__(self, other):
return self.waiting_until == other.waiting_until

def __lt__(self, other):
return self.waiting_until < other.waiting_until

class SleepingLoop:
"""An event loop focused on delaying execution of coroutines.

Think of this as being like asyncio.BaseEventLoop/curio.Kernel.
"""

def __init__(self, *coros):
self._new = coros
self._waiting = []

def run_until_complete(self):
# Start all the coroutines.
for coro in self._new:
wait_for = coro.send(None)
heapq.heappush(self._waiting, Task(wait_for, coro))
# Keep running until there is no more work to do.
while self._waiting:
now = datetime.datetime.now()
# Get the coroutine with the soonest resumption time.
task = heapq.heappop(self._waiting)
if now < task.waiting_until:
# Were ahead of schedule; wait until its time to resume.
delta = task.waiting_until - now
time.sleep(delta.total_seconds())
now = datetime.datetime.now()
try:
# Its time to resume the coroutine.
wait_until = task.coro.send(now)
heapq.heappush(self._waiting, Task(wait_until, task.coro))
except StopIteration:
# The coroutine is done.
pass

@types.coroutine
def sleep(seconds):
"""Pause a coroutine for the specified number of seconds.

Think of this as being like asyncio.sleep()/curio.sleep().
"""
now = datetime.datetime.now()
wait_until = now + datetime.timedelta(seconds=seconds)
# Make all coroutines on the call stack pause; the need to use `yield`
# necessitates this be generator-based and not an async-based coroutine.
actual = yield wait_until
# Resume the execution stack, sending back how long we actually waited.
return actual - now

async def countdown(label, length, *, delay=0):
"""Countdown a launch for `length` seconds, waiting `delay` seconds.

This is what a user would typically write.
"""
print(label, waiting, delay, seconds before starting countdown)
delta = await sleep(delay)
print(label, starting after waiting, delta)
while length:
print(label, T-minus, length)
waited = await sleep(1)
length -= 1
print(label, lift-off!)

def main():
"""Start the event loop, counting down 3 separate launches.

This is what a user would typically write.
"""
loop = SleepingLoop(
countdown(A, 5, delay=0),
# countdown(B, 3, delay=2),
# countdown(C, 4, delay=1)
)
start = datetime.datetime.now()
loop.run_until_complete()
print(Total elapsed time is, datetime.datetime.now() - start)

if __name__ == __main__:
main()

總結一下

把這個例子裡面的元素和 asyncio做一下對應:

  • Task類相當於 asyncio.Task。本文的 Task依據 waiting_until來判斷恢復執行時間; asyncio.Task是一個 future對象,當 asyncio的事件循環檢測到這個 future對象的狀態發生變化的時候,執行相應的邏輯。
  • sleep()函數相等於 asyncio.sleep()。不會阻塞。
  • SleepingLoop相當於 asyncio.BaseEventLoopSleepingLoop用的是最小堆, asyncio.BaseEventLoop更加複雜,基於 future對象,以及 selectors模塊等。

如果你像我一樣真正熱愛計算機科學,喜歡研究底層邏輯,歡迎關注我的微信公眾號:


推薦閱讀:
相关文章