理解事件驅動型I/O

我們可能聽說過某些Python的包是基於「事件驅動「或者」非同步「I/O的,但是我們並不能完全理解它到底是什麼意思,在底層是如何工作的,或者說如果使用了這樣的技術會對程序產生什麼樣子的影響

從根本上來說,事件驅動IO是一種將基本的IO操作轉化成事件的技術,而我們必須在程序中去處理這些事件。比如說當在socket上收到數據的時候,這就成為一個」接收事件「, 由我們提供的回調方法或者是函數來負責處理以此來響應這個事件。一個時間驅動型框架可能會以一個基類作為起始點,實現一系列基本的事件處理方法就可以像這樣:

class EventHandler:
def fileno(self):
Return the associated file descriptor
raise NotImplemented(must implement)

def wants_to_receive(self):
Return True if receiving is allowed
return False

def handle_receive(self):
Perform the receive operation
pass

def wants_to_send(self):
Return True if sending is requested
return False

def handle_send(self):
Send outgoing data
pass

之後,我們就可以把這個類的實例插入到一個事件循環之中了,看起來就像是這樣:

import select

def event_loop(handlers):
while True:
wants_recv = [h for h in handlers if h.wants_to_receive()]
wants_send = [h for h in handlers if h.wants_to_send()]
can_recv, can_send, _ = select.select(wants_recv, wants_send, [])
for h in can_recv:
h.handle_receive()
for h in can_send:
h.handle_send()

事件循環的關鍵部分是 select() 調用,它會不斷輪詢文件描述符從而激活它。 在調用 select()之前,事件循環會詢問所有的處理器來決定哪一個想接受或發生。 然後它將結果列表提供給 select() 。然後 select() 返回準備接受或發送的對象組成的列表。 然後相應的 handle_receive() 或 handle_send() 方法被觸發。

編寫應用程序的時候,EventHandler 的實例會被創建。例如,下面是兩個簡單的基於UDP網路服務的處理器例子:

import socket
import time

class UDPServer(EventHandler):
def __init__(self, address):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind(address)

def fileno(self):
return self.sock.fileno()

def wants_to_receive(self):
return True

class UDPTimeServer(UDPServer):
def handle_receive(self):
msg, addr = self.sock.recvfrom(1)
self.sock.sendto(time.ctime().encode(ascii), addr)

class UDPEchoServer(UDPServer):
def handle_receive(self):
msg, addr = self.sock.recvfrom(8192)
self.sock.sendto(msg, addr)

if __name__ == __main__:
handlers = [ UDPTimeServer((,14000)), UDPEchoServer((,15000)) ]
event_loop(handlers)

實現一個TCP伺服器會更加複雜一點,因為每一個客戶端都要初始化一個新的處理器對象。 下面是一個TCP應答客戶端例子:

class TCPServer(EventHandler):
def __init__(self, address, client_handler, handler_list):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
self.sock.bind(address)
self.sock.listen(1)
self.client_handler = client_handler
self.handler_list = handler_list

def fileno(self):
return self.sock.fileno()

def wants_to_receive(self):
return True

def handle_receive(self):
client, addr = self.sock.accept()
# Add the client to the event loops handler list
self.handler_list.append(self.client_handler(client, self.handler_list))

class TCPClient(EventHandler):
def __init__(self, sock, handler_list):
self.sock = sock
self.handler_list = handler_list
self.outgoing = bytearray()

def fileno(self):
return self.sock.fileno()

def close(self):
self.sock.close()
# Remove myself from the event loops handler list
self.handler_list.remove(self)

def wants_to_send(self):
return True if self.outgoing else False

def handle_send(self):
nsent = self.sock.send(self.outgoing)
self.outgoing = self.outgoing[nsent:]

class TCPEchoClient(TCPClient):
def wants_to_receive(self):
return True

def handle_receive(self):
data = self.sock.recv(8192)
if not data:
self.close()
else:
self.outgoing.extend(data)

if __name__ == __main__:
handlers = []
handlers.append(TCPServer((,16000), TCPEchoClient, handlers))
event_loop(handlers)

TCP例子的關鍵點是從處理器中列表增加和刪除客戶端的操作。 對每一個連接,一個新的處理器被創建並加到列表中。當連接被關閉後,每個客戶端負責將其從列表中刪除。 如果你運行程序並試著用Telnet或類似工具連接,它會將你發送的消息回顯給你。並且它能很輕鬆的處理多客戶端連接。

實際上所有的事件驅動框架原理跟上面的例子相差無幾。實際的實現細節和軟體架構可能不一樣, 但是在最核心的部分,都會有一個輪詢的循環來檢查活動socket,並執行響應操作。

事件驅動I/O的一個可能好處是它能處理非常大的並發連接,而不需要使用多線程或多進程。 也就是說,select() 調用(或其他等效的)能監聽大量的socket並響應它們中任何一個產生事件的。 在循環中一次處理一個事件,並不需要其他的並發機制。

事件驅動I/O的缺點是沒有真正的同步機制。 如果任何事件處理器方法阻塞或執行一個耗時計算,它會阻塞所有的處理進程。 調用那些並不是事件驅動風格的庫函數也會有問題,同樣要是某些庫函數調用會阻塞,那麼也會導致整個事件循環停止。

對於阻塞或耗時計算的問題可以通過將事件發送個其他單獨的現場或進程來處理。 不過,在事件循環中引入多線程和多進程是比較棘手的, 下面的例子演示了如何使用 concurrent.futures 模塊來實現:

from concurrent.futures import ThreadPoolExecutor
import os

class ThreadPoolHandler(EventHandler):
def __init__(self, nworkers):
if os.name == posix:
self.signal_done_sock, self.done_sock = socket.socketpair()
else:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((127.0.0.1, 0))
server.listen(1)
self.signal_done_sock = socket.socket(socket.AF_INET,
socket.SOCK_STREAM)
self.signal_done_sock.connect(server.getsockname())
self.done_sock, _ = server.accept()
server.close()

self.pending = []
self.pool = ThreadPoolExecutor(nworkers)

def fileno(self):
return self.done_sock.fileno()

# Callback that executes when the thread is done
def _complete(self, callback, r):

self.pending.append((callback, r.result()))
self.signal_done_sock.send(bx)

# Run a function in a thread pool
def run(self, func, args=(), kwargs={},*,callback):
r = self.pool.submit(func, *args, **kwargs)
r.add_done_callback(lambda r: self._complete(callback, r))

def wants_to_receive(self):
return True

# Run callback functions of completed work
def handle_receive(self):
# Invoke all pending callback functions
for callback, result in self.pending:
callback(result)
self.done_sock.recv(1)
self.pending = []

發送和接收大型數組

我們想通過網路來發送和接受大型數組,其中的數據都是連續的,並且要求儘可能少地對數據進行拷貝

下面的函數利用memoryviews來發送和接受大數組:

# zerocopy.py

def send_from(arr, dest):
view = memoryview(arr).cast(B)
while len(view):
nsent = dest.send(view)
view = view[nsent:]

def recv_into(arr, source):
view = memoryview(arr).cast(B)
while len(view):
nrecv = source.recv_into(view)
view = view[nrecv:]

為了測試程序,首先創建一個通過socket連接的伺服器和客戶端程序:

>>> from socket import *
>>> s = socket(AF_INET, SOCK_STREAM)
>>> s.bind((, 25000))
>>> s.listen(1)
>>> c,a = s.accept()
>>>

在客戶端(另外一個解釋器中):

>>> from socket import *
>>> c = socket(AF_INET, SOCK_STREAM)
>>> c.connect((localhost, 25000))
>>>

在數據密集型分散式計算和平行計算程序中,自己寫程序來實現發送/接受大量數據並不常見。 不過,要是你確實想這樣做,你可能需要將你的數據轉換成原始位元組,以便給低層的網路函數使用。 你可能還需要將數據切割成多個塊,因為大部分和網路相關的函數並不能一次性發送或接受超大數據塊。

一種方法是使用某種機制序列化數據——可能將其轉換成一個位元組字元串。 不過,這樣最終會創建數據的一個複製。 就算你只是零碎的做這些,你的代碼最終還是會有大量的小型複製操作。

本節通過使用內存視圖展示了一些魔法操作。 本質上,一個內存視圖就是一個已存在數組的覆蓋層。不僅僅是那樣, 內存視圖還能以不同的方式轉換成不同類型來表現數據。 這個就是下面這個語句的目的:

view = memoryview(arr).cast(B)

它接受一個數組 arr並將其轉換為一個無符號位元組的內存視圖。這個視圖能被傳遞給socket相關函數, 比如 socket.send() 或 send.recv_into() 。 在內部,這些方法能夠直接操作這個內存區域。例如,sock.send() 直接從內存中發生數據而不需要複製。 send.recv_into() 使用這個內存區域作為接受操作的輸入緩衝區。

剩下的一個難點就是socket函數可能只操作部分數據。 通常來講,我們得使用很多不同的 send()和 recv_into() 來傳輸整個數組。 不用擔心,每次操作後,視圖會通過發送或接受位元組數量被切割成新的視圖。 新的視圖同樣也是內存覆蓋層。因此,還是沒有任何的複製操作。

這裡有個問題就是接受者必須事先知道有多少數據要被發送, 以便它能預分配一個數組或者確保它能將接受的數據放入一個已經存在的數組中。 如果沒辦法知道的話,發送者就得先將數據大小發送過來,然後再發送實際的數組數據。

參考書目

《Python CookBook》作者:【美】 David Beazley, Brian K. Jones

Github地址:

yidao620c/python3-cookbook?

github.com
圖標

推薦閱讀:
相关文章