asyncio模塊提供了使用協程構建並發應用的工具。它使用一種單線程單進程的的方式實現並發,應用的各個部分彼此合作, 可以顯示的切換任務,一般會在程序阻塞I/O操作的時候發生上下文切換如等待讀寫文件,或者請求網路。同時asyncio也支持調度代碼在將來的某個特定事件運行,從而支持一個協程等待另一個協程完成,以處理系統信號和識別其他一些事件。

非同步並發的概念

對於其他的並發模型大多數採取的都是線性的方式編寫。並且依賴於語言運行時系統或操作系統的底層線程或進程來適當地改變上下文,而基於asyncio的應用要求應用代碼顯示的處理上下文切換。

asyncio提供的框架以事件循環(event loop)為中心,程序開啟一個無限的循環,程序會把一些函數註冊到事件循環上。當滿足事件發生的時候,調用相應的協程函數。

事件循環

事件循環是一種處理多並發量的有效方式,在維基百科中它被描述為「一種等待程序分配事件或消息的編程架構」,我們可以定義事件循環來簡化使用輪詢方法來監控事件,通俗的說法就是「當A發生時,執行B」。事件循環利用poller對象,使得程序員不用控制任務的添加、刪除和事件的控制。事件循環使用回調方法來知道事件的發生。它是asyncio提供的「中央處理設備」,支持如下操作:

  • 註冊、執行和取消延遲調用(超時)
  • 創建可用於多種類型的通信的服務端和客戶端的Transports
  • 啟動進程以及相關的和外部通信程序的Transports
  • 將耗時函數調用委託給一個線程池
  • 單線程(進程)的架構也避免的多線程(進程)修改可變狀態的鎖的問題。

與事件循環交互的應用要顯示地註冊將運行的代碼,讓事件循環在資源可用時嚮應用代碼發出必要的調用。如:一個套接字再沒有更多的數據可以讀取,那麼伺服器會把控制全交給事件循環。

Future

future是一個數據結構,表示還未完成的工作結果。事件循環可以監視Future對象是否完成。從而允許應用的一部分等待另一部分完成一些工作。

Task

task是Future的一個子類,它知道如何包裝和管理一個協程的執行。任務所需的資源可用時,事件循環會調度任務允許,並生成一個結果,從而可以由其他協程消費。

非同步方法

使用asyncio也就意味著你需要一直寫非同步方法。

一個標準方法是這樣的:

def regular_double(x):
return 2 * x

而一個非同步方法:

async def async_double(x):
return 2 * x

從外觀上看非同步方法和標準方法沒什麼區別只是前面多了個async。

「Async」 是「asynchronous」的簡寫,為了區別於非同步函數,我們稱標準函數為同步函數,從用戶角度非同步函數和同步函數有以下區別:

要調用非同步函數,必須使用await關鍵字。 因此,不要寫regular_double(3),而是寫await async_double(3).

不能在同步函數裏使用await,否則會出錯。

句法錯誤:

def print_double(x):
print(await async_double(x)) # <-- SyntaxError here

但是在非同步函數中,await是被允許的:

async def print_double(x):
print(await async_double(x)) # <-- OK!

協程

啟動一個協程

一般非同步方法被稱之為協程(Coroutine)。asyncio事件循環可以通過多種不同的方法啟動一個協程。一般對於入口函數,最簡答的方法就是使用run_until_complete(),並將協程直接傳入這個方法。

import asyncio

async def foo():
print("這是一個協程")

if __name__ == __main__:
loop = asyncio.get_event_loop()
try:
print("開始運行協程")
coro = foo()
print("進入事件循環")
loop.run_until_complete(coro)
finally:
print("關閉事件循環")
loop.close()

輸出

開始運行協程
進入事件循環
這是一個協程
關閉事件循環

這就是最簡單的一個協程的例子,下面讓我們瞭解一下上面的代碼.

第一步首先得到一個事件循環的應用也就是定義的對象loop。可以使用默認的事件循環,也可以實例化一個特定的循環類(比如uvloop),這裡使用了默認循環run_until_complete(coro)方法用這個協程啟動循環,協程返回時這個方法將停止循環。run_until_complete的參數是一個futrue對象。當傳入一個協程,其內部會自動封裝成task,其中task是Future的子類。關於task和future後面會提到。

從協程中返回值

將上面的代碼,改寫成下面代碼

import asyncio

async def foo():
print("這是一個協程")
return "返回值"

if __name__ == __main__:
loop = asyncio.get_event_loop()
try:
print("開始運行協程")
coro = foo()
print("進入事件循環")
result = loop.run_until_complete(coro)
print(f"run_until_complete可以獲取協程的{result},默認輸出None")
finally:
print("關閉事件循環")
loop.close()

run_until_complete可以獲取協程的返回值,如果沒有給定返回值,則像函數一樣,默認返回None。

協程調用協程

一個協程可以啟動另一個協程,從而可以任務根據工作內容,封裝到不同的協程中。我們可以在協程中使用await關鍵字,鏈式的調度協程,來形成一個協程任務流。向下面的例子一樣。

import asyncio

async def main():
print("主協程")
print("等待result1協程運行")
res1 = await result1()
print("等待result2協程運行")
res2 = await result2(res1)
return (res1,res2)

async def result1():
print("這是result1協程")
return "result1"

async def result2(arg):
print("這是result2協程")
return f"result2接收了一個參數,{arg}"

if __name__ == __main__:
loop = asyncio.get_event_loop()
try:
result = loop.run_until_complete(main())
print(f"獲取返回值:{result}")
finally:
print("關閉事件循環")
loop.close()

輸出

主協程
等待result1協程運行
這是result1協程
等待result2協程運行
這是result2協程
獲取返回值:(result1, result2接收了一個參數,result1)
關閉事件循環

協程中調用普通函數

在協程中可以通過一些方法去調用普通的函數。可以使用的關鍵字有call_soon,call_later,call_at。

call_soon

可以通過字面意思理解調用立即返回。

loop.call_soon(callback, *args, context=None)

在下一個迭代的時間循環中立刻調用回調函數,大部分的回調函數支持位置參數,而不支持」關鍵字參數」,如果是想要使用關鍵字參數,則推薦使用functools.aprtial()對方法進一步包裝.可選關鍵字context允許指定要運行的回調的自定義contextvars.Context。當沒有提供上下文時使用當前上下文。在Python 3.7中, asyncio

協程加入了對上下文的支持。使用上下文就可以在一些場景下隱式地傳遞變數,比如資料庫連接session等,而不需要在所有方法調用顯示地傳遞這些變數。下面來看一下具體的使用例子。

import asyncio
import functools

def callback(args, *, kwargs="defalut"):
print(f"普通函數做為回調函數,獲取參數:{args},{kwargs}")

async def main(loop):
print("註冊callback")
loop.call_soon(callback, 1)
wrapped = functools.partial(callback, kwargs="not defalut")
loop.call_soon(wrapped, 2)
await asyncio.sleep(0.2)

if __name__ == __main__:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(loop))
finally:
loop.close()

輸出結果

註冊callback
普通函數做為回調函數,獲取參數:1,defalut
普通函數做為回調函數,獲取參數:2,not defalut

通過輸出結果我們可以發現我們在協程中成功調用了一個普通函數,順序的列印了1和2。

有時候我們不想立即調用一個函數,此時我們就可以call_later延時去調用一個函數了。

call_later

loop.call_later(delay, callback, *args, context=None)

首先簡單的說一下它的含義,就是事件循環在delay多長時間之後才執行callback函數.

配合上面的call_soon讓我們看一個小例子

import asyncio

def callback(n):
print(f"callback {n} invoked")

async def main(loop):
print("註冊callbacks")
loop.call_later(0.2, callback, 1)
loop.call_later(0.1, callback, 2)
loop.call_soon(callback, 3)
await asyncio.sleep(0.4)

if __name__ == __main__:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(loop))
finally:
loop.close()

輸出

註冊callbacks
callback 3 invoked
callback 2 invoked
callback 1 invoked

通過上面的輸出可以得到如下結果:

1.call_soon會在call_later之前執行,和它的位置在哪無關2.call_later的第一個參數越小,越先執行。

call_at

loop.call_at(when, callback, *args, context=None)

call_at第一個參數的含義代表的是一個單調時間,它和我們平時說的系統時間有點差異,

這裡的時間指的是事件循環內部時間,可以通過loop.time()獲取,然後可以在此基礎上進行操作。後面的參數和前面的兩個方法一樣。實際上call_later內部就是調用的call_at。

import asyncio

def call_back(n, loop):
print(f"callback {n} 運行時間點{loop.time()}")

async def main(loop):
now = loop.time()
print("當前的內部時間", now)
print("循環時間", now)
print("註冊callback")
loop.call_at(now + 0.1, call_back, 1, loop)
loop.call_at(now + 0.2, call_back, 2, loop)
loop.call_soon(call_back, 3, loop)
await asyncio.sleep(1)

if __name__ == __main__:
loop = asyncio.get_event_loop()
try:
print("進入事件循環")
loop.run_until_complete(main(loop))
finally:
print("關閉循環")
loop.close()

輸出

進入事件循環
當前的內部時間 4412.152849525
循環時間 4412.152849525
註冊callback
callback 3 運行時間點4412.152942526
callback 1 運行時間點4412.253202825
callback 2 運行時間點4412.354262512
關閉循環

因為call_later內部實現就是通過call_at所以這裡就不多說了。

Future

獲取Futrue裏的結果

future表示還沒有完成的工作結果。事件循環可以通過監視一個future對象的狀態來指示它已經完成。future對象有幾個狀態:

  • Pending
  • Running
  • Done
  • Cancelled創建future的時候,task為pending,事件循環調用執行的時候當然就是running,調用完畢自然就是done,如果需要停止事件循環,就需要先把task取消,狀態為cancel。

import asyncio

def foo(future, result):
print(f"此時future的狀態:{future}")
print(f"設置future的結果:{result}")
future.set_result(result)
print(f"此時future的狀態:{future}")

if __name__ == __main__:
loop = asyncio.get_event_loop()
try:
all_done = asyncio.Future()
loop.call_soon(foo, all_done, "Future is done!")
print("進入事件循環")
result = loop.run_until_complete(all_done)
print("返回結果", result)
finally:
print("關閉事件循環")
loop.close()
print("獲取future的結果", all_done.result())

輸出

進入事件循環
此時future的狀態:<Future pending cb=[_run_until_complete_cb() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py:176]>
設置future的結果:Future is done!
此時future的狀態:<Future finished result=Future is done!>
返回結果 Future is done!
關閉事件循環
獲取future的結果 Future is done!

`

可以通過輸出結果發現,調用set_result之後future對象的狀態由pending變為finished,Future的實例all_done會保留提供給方法的結果,可以在後續使用。

Future對象使用await

future和協程一樣可以使用await關鍵字獲取其結果。

import asyncio

def foo(future, result):
print("設置結果到future", result)
future.set_result(result)

async def main(loop):
all_done = asyncio.Future()
print("調用函數獲取future對象")
loop.call_soon(foo, all_done, "the result")

result = await all_done
print("獲取future裏的結果", result)

if __name__ == __main__:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(loop))
finally:
loop.close()

Future回調

Future 在完成的時候可以執行一些回調函數,回調函數按註冊時的順序進行調用:

import asyncio
import functools

def callback(future, n):
print({}: future done: {}.format(n, future.result()))

async def register_callbacks(all_done):
print(註冊callback到future對象)
all_done.add_done_callback(functools.partial(callback, n=1))
all_done.add_done_callback(functools.partial(callback, n=2))

async def main(all_done):
await register_callbacks(all_done)
print(設置future的結果)
all_done.set_result(the result)

if __name__ == __main__:
loop = asyncio.get_event_loop()
try:
all_done = asyncio.Future()
loop.run_until_complete(main(all_done))
finally:
loop.close()

通過add_done_callback方法給funtrue任務添加回調函數,當funture執行完成的時候,就會調用回調函數。並通過參數future獲取協程執行的結果。

到此為止,我們就學會瞭如何在協程中調用一個普通函數並獲取其結果。

並發的執行任務

任務(Task)是與事件循環交互的主要途徑之一。任務可以包裝協程,可以跟蹤協程何時完成。任務是Future的子類,所以使用方法和future一樣。協程可以等待任務,每個任務都有一個結果,在它完成之後可以獲取這個結果。

因為協程是沒有狀態的,我們通過使用create_task方法可以將協程包裝成有狀態的任務。還可以在任務運行的過程中取消任務。

import asyncio

async def child():
print("進入子協程")
return "the result"

async def main(loop):
print("將協程child包裝成任務")
task = loop.create_task(child())
print("通過cancel方法可以取消任務")
task.cancel()
try:
await task
except asyncio.CancelledError:
print("取消任務拋出CancelledError異常")
else:
print("獲取任務的結果", task.result())

if __name__ == __main__:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(loop))
finally:
loop.close()

輸出

將協程child包裝成任務
通過cancel方法可以取消任務
取消任務拋出CancelledError異常

如果把上面的task.cancel()注釋了我們可以得到正常情況下的結果,如下。

將協程child包裝成任務
通過cancel方法可以取消任務
進入子協程
獲取任務的結果 the result

另外出了使用loop.create_task將協程包裝為任務外還可以使用asyncio.ensure_future(coroutine)建一個task。在python3.7中可以使用asyncio.create_task創建任務。

組合協程

一系列的協程可以通過await鏈式的調用,但是有的時候我們需要在一個協程裏等待多個協程,比如我們在一個協程裏等待1000個非同步網路請求,對於訪問次序有沒有要求的時候,就可以使用另外的關鍵字wait或gather來解決了。wait可以暫停一個協程,直到後臺操作完成。

等待多個協程

Task的使用

import asyncio

async def num(n):
try:
await asyncio.sleep(n*0.1)
return n
except asyncio.CancelledError:
print(f"數字{n}被取消")
raise

async def main():
tasks = [num(i) for i in range(10)]
complete, pending = await asyncio.wait(tasks, timeout=0.5)
for i in complete:
print("當前數字",i.result())
if pending:
print("取消未完成的任務")
for p in pending:
p.cancel()

if __name__ == __main__:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()

輸出

當前數字 1
當前數字 2
當前數字 0
當前數字 4
當前數字 3
取消未完成的任務
數字5被取消
數字9被取消
數字6被取消
數字8被取消
數字7被取消

可以發現我們的結果並沒有按照數字的順序顯示,在內部wait()使用一個set保存它創建的Task實例。因為set是無序的所以這也就是我們的任務不是順序執行的原因。wait的返回值是一個元組,包括兩個集合,分別表示已完成和未完成的任務。wait第二個參數為一個超時值

達到這個超時時間後,未完成的任務狀態變為pending,當程序退出時還有任務沒有完成此時就會看到如下的錯誤提示。

Task was destroyed but it is pending!
task: <Task pending coro=<num() done, defined at 11.py:12> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1093e0558>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<num() done, defined at 11.py:12> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1093e06d8>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<num() done, defined at 11.py:12> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1093e0738>()]>>

此時我們可以通過迭代調用cancel方法取消任務。也就是這段代碼

if pending:
print("取消未完成的任務")
for p in pending:
p.cancel()

gather的使用

gather的作用和wait類似不同的是。

1.gather任務無法取消。2.返回值是一個結果列表3.可以按照傳入參數的順序,順序輸出。我們將上面的代碼改為gather的方式

import asyncio

async def num(n):
try:
await asyncio.sleep(n * 0.1)
return n
except asyncio.CancelledError:
print(f"數字{n}被取消")
raise

async def main():
tasks = [num(i) for i in range(10)]
complete = await asyncio.gather(*tasks)
for i in complete:
print("當前數字", i)

if __name__ == __main__:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()

輸出

當前數字 0
當前數字 1
....中間部分省略
當前數字 9

gather通常被用來階段性的一個操作,做完第一步才能做第二步,比如下面這樣

import asyncio

import time

async def step1(n, start):
await asyncio.sleep(n)
print("第一階段完成")
print("此時用時", time.time() - start)
return n

async def step2(n, start):
await asyncio.sleep(n)
print("第二階段完成")
print("此時用時", time.time() - start)
return n

async def main():
now = time.time()
result = await asyncio.gather(step1(5, now), step2(2, now))
for i in result:
print(i)
print("總用時", time.time() - now)

if __name__ == __main__:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()

輸出

第二階段完成
此時用時 2.0014898777008057
第一階段完成
此時用時 5.002960920333862
5
2
總用時 5.003103017807007

可以通過上面結果得到如下結論:

1.step1和step2是並行運行的。

2.gather會等待最耗時的那個完成之後才返回結果,耗時總時間取決於其中任務最長時間的那個。

任務完成時進行處理

as_complete是一個生成器,會管理指定的一個任務列表,並生成他們的結果。每個協程結束運行時一次生成一個結果。與wait一樣,as_complete不能保證順序,不過執行其他動作之前沒有必要等待所以後臺操作完成。

import asyncio
import time

async def foo(n):
print(Waiting: , n)
await asyncio.sleep(n)
return n

async def main():
coroutine1 = foo(1)
coroutine2 = foo(2)
coroutine3 = foo(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
for task in asyncio.as_completed(tasks):
result = await task
print(Task ret: {}.format(result))

now = lambda : time.time()
start = now()

loop = asyncio.get_event_loop()
done = loop.run_until_complete(main())
print(now() - start)

輸出

Waiting: 1
Waiting: 2
Waiting: 4
Task ret: 1
Task ret: 2
Task ret: 4
4.004292249679565

可以發現結果逐個輸出。

到此為止第一部分就結束了,對於asyncio入門級學習來說這些內容就夠了。如果想繼續跟進asyncio的內容,敬請期待後面的內容。

參考資料

  • The Python 3 Standard Library by Example
  • docs.python.org/3/libra

更多非同步內容請關注微信公眾號:python學習開發

推薦閱讀:

相關文章