TaskA
將在0秒後恢復運行,他的恢復運行時間(wait_until)『最小』,所以就會首先被彈出執行,然後 TaskB
會取代他的位置成為『最小』的元素。
實際執行的任務
@types . coroutine
def sleep ( seconds ):
now = datetime . datetime . now ()
wait_until = now + datetime . timedelta ( seconds = seconds )
actual = yield wait_until
return actual - now
async def countdown ( label , length , * , delay = 0 ):
print ( label , waiting , delay , seconds before starting countdown )
delta = await sleep ( delay )
print ( label , starting after waiting , delta )
while length :
print ( label , T-minus , length )
waited = await sleep ( 1 )
length -= 1
print ( label , lift-off! )
在 delay
秒之後運行一個耗時 length
秒的任務。簡要分析一下代碼:
有一點需要明確, countdown()
返回的是一個 coroutine
對象,你如果不 await
它(或者調用 next()
, send()
),什麼也不會真正執行。
delta=awaitsleep(delay)
這一句,會加入coroutine sleep()
裡面,在第一個 yield 的地方暫停。要想讓它恢復運行,需要通過某種方式"send stuff back"(參考上一篇文章),也就是對這個生成器調用 send()
方法。 後面會看到,實際上這屬於事件循環的工作。
另外,對於每個任務,第一次恢復執行的時間應該是 delay
秒,所以事件循環應該在程序開始 delay
秒的時候調用 send()
。
後面的 while
循環會再次進行運行、暫停的循環,直到時間超過了 length
秒,也就是任務結束。
事件循環代碼
class SleepingLoop:
def __init__(self, *coros):
self._new = coros
self._waiting = []
def run_until_complete(self):
for coro in self._new:
wait_for = coro.send(None)
heapq.heappush(self._waiting, Task(wait_for, coro))
while self._waiting:
now = datetime.datetime.now()
task = heapq.heappop(self._waiting)
if now < task.waiting_until:
delta = task.waiting_until - now
time.sleep(delta.total_seconds())
now = datetime.datetime.now()
try:
# Its time to resume the coroutine.
wait_until = task.coro.send(now)
heapq.heappush(self._waiting, Task(wait_until, task.coro))
except StopIteration:
# The coroutine is done.
pass
def main():
"""Start the event loop, counting down 3 separate launches.
This is what a user would typically write.
"""
loop = SleepingLoop(
countdown(A, 5, delay=0),
countdown(B, 3, delay=2),
countdown(C, 4, delay=1)
)
start = datetime.datetime.now()
loop.run_until_complete()
print(Total elapsed time is, datetime.datetime.now() - start)
if __name__ == __main__:
main()
代碼一共就只有這麼點,是不是很簡單?來分析一下:
for coro in self._new:
wait_for = coro.send(None)
heapq.heappush(self._waiting, Task(wait_for, coro))
wait_for=coro.send(None)
是第一次對這些coroutine對象調用 send()
,如前面所說,這一步會在 sleep
的 actual=yieldwait_until
這個地方停下來。 wait_until
的值會傳給 wait_for
,這是第一次開始任務開始運行的時間。然後把這些Task 對象添加到最小堆裡面。
接下來是一個 while
循環,每個循環從最小堆中取出『最小』的元素,也就是下一次恢復運行時間最近的哪一個任務。如果發現現在還沒到它的恢復執行時間,就調用阻塞 的 time.sleep()
。(這裡可以阻塞,因為這個事件循環非常簡單,我們可以確定這段時間沒有新的任務需要恢復執行。)
接著對 coro
調用 send()
方法,如果還沒遇到 StopIteration
,就把新的 Task 推到最小堆(前面從最小堆裡面取出任務,如果這個任務沒迭代完,就更新它的下次恢復執行時間,再次推到最小堆裡面)。
那麼什麼時候會發生 StopIteration
異常呢?當 countdown()
這個 coroutine 得 while 循環結束的時候,也就是沒有更多的 yield 的時候。
最終的代碼
import datetime
import heapq
import types
import time
class Task:
"""Represent how long a coroutine should wait before starting again.
Comparison operators are implemented for use by heapq. Two-item
tuples unfortunately dont work because when the datetime.datetime
instances are equal, comparison falls to the coroutine and they dont
implement comparison methods, triggering an exception.
Think of this as being like asyncio.Task/curio.Task.
"""
def __init__(self, wait_until, coro):
self.coro = coro
self.waiting_until = wait_until
def __eq__(self, other):
return self.waiting_until == other.waiting_until
def __lt__(self, other):
return self.waiting_until < other.waiting_until
class SleepingLoop:
"""An event loop focused on delaying execution of coroutines.
Think of this as being like asyncio.BaseEventLoop/curio.Kernel.
"""
def __init__(self, *coros):
self._new = coros
self._waiting = []
def run_until_complete(self):
# Start all the coroutines.
for coro in self._new:
wait_for = coro.send(None)
heapq.heappush(self._waiting, Task(wait_for, coro))
# Keep running until there is no more work to do.
while self._waiting:
now = datetime.datetime.now()
# Get the coroutine with the soonest resumption time.
task = heapq.heappop(self._waiting)
if now < task.waiting_until:
# Were ahead of schedule; wait until its time to resume.
delta = task.waiting_until - now
time.sleep(delta.total_seconds())
now = datetime.datetime.now()
try:
# Its time to resume the coroutine.
wait_until = task.coro.send(now)
heapq.heappush(self._waiting, Task(wait_until, task.coro))
except StopIteration:
# The coroutine is done.
pass
@types.coroutine
def sleep(seconds):
"""Pause a coroutine for the specified number of seconds.
Think of this as being like asyncio.sleep()/curio.sleep().
"""
now = datetime.datetime.now()
wait_until = now + datetime.timedelta(seconds=seconds)
# Make all coroutines on the call stack pause; the need to use `yield`
# necessitates this be generator-based and not an async-based coroutine.
actual = yield wait_until
# Resume the execution stack, sending back how long we actually waited.
return actual - now
async def countdown(label, length, *, delay=0):
"""Countdown a launch for `length` seconds, waiting `delay` seconds.
This is what a user would typically write.
"""
print(label, waiting, delay, seconds before starting countdown)
delta = await sleep(delay)
print(label, starting after waiting, delta)
while length:
print(label, T-minus, length)
waited = await sleep(1)
length -= 1
print(label, lift-off!)
def main():
"""Start the event loop, counting down 3 separate launches.
This is what a user would typically write.
"""
loop = SleepingLoop(
countdown(A, 5, delay=0),
# countdown(B, 3, delay=2),
# countdown(C, 4, delay=1)
)
start = datetime.datetime.now()
loop.run_until_complete()
print(Total elapsed time is, datetime.datetime.now() - start)
if __name__ == __main__:
main()
總結一下
把這個例子裡面的元素和 asyncio
做一下對應:
Task
類相當於 asyncio.Task
。本文的 Task
依據 waiting_until
來判斷恢復執行時間; asyncio.Task
是一個 future
對象,當 asyncio
的事件循環檢測到這個 future
對象的狀態發生變化的時候,執行相應的邏輯。
sleep()
函數相等於 asyncio.sleep()
。不會阻塞。
SleepingLoop
相當於 asyncio.BaseEventLoop
。 SleepingLoop
用的是最小堆, asyncio.BaseEventLoop
更加複雜,基於 future
對象,以及 selectors
模塊等。
如果你像我一樣真正熱愛計算機科學,喜歡研究底層邏輯,歡迎關注我的微信公眾號:
推薦閱讀: