python协程进阶,原来实现一个事件循环可以如此简单!
引言
目前很多公司选择将python项目使用golang重构,很大一方面原因是因为golang的并发能力,golang自带的语法糖支持使并发编程变的相对简单,也更能充分的使用多核CPU的计算资源。
相应的,python长期受制于GIL,无法在多线程时使用多核CPU,所以一直以来在谈及python的缺陷时,性能总是无法回避的一个问题。当然,一些python著名的第三方组织也一直通过各种手段来改善python的并发性能,如twisted的非同步模型使用事件驱动机制来提升python性能,著名的爬虫框架scrapy便是以twisted作为底层网路库来开发的,还有gevent,它使用greenlet在用户态完成栈和上下文切换来减少切换带来的性能损耗,同样还有著名的web协程框架tornado,他使用生成器来保存协程上下文及状态,使用原生的python语法实现了协程。但从python3.4开始python引入asyncio标准库,随后又在3.5引入async/await关键字,从根本上规范了python非同步编程标准,使python非同步编程逐渐流行起来。
关于什么是python协程,相信网上已经有了不少资料,但是只描述抽象的上层建筑难免会让人接受困难,本文希望可以通过从最简单的代码和逻辑,使用最基础的数据结构,从实现出发,带领大家理解什么是python协程。
首先需要补充一些基础知识
什么是生成器
我们都应该听说过迭代器,这在很多语言中都有类似的概念,简单的说,迭代器就是可以被迭代的对象,对其使用next操作可以返回一个元素,通常多次迭代后迭代器会中止,此时迭代器无法再使用。比如python中可以通过iter方法来将一个列表转换成迭代器:
In [1]: lst = [1, 2, 3]
In [2]: iterator = iter(lst)
In [3]: next(iterator)
Out[3]: 1
In [4]: next(iterator)
Out[4]: 2
In [5]: next(iterator)
Out[5]: 3
In [6]: next(iterator)
---------------------------------------------------------------------------
StopIteration Traceback (most recent call last)
<ipython-input-7-4ce711c44abc> in <module>()
----> 1 next(iterator)
StopIteration:
生成器可以看作是迭代器的子类,同时提供了比迭代器更强大的功能,python中,可以使用yield关键字使函数返回生成器对象。
In [8]: def fun():
...: yield 1
...: yield 2
...: yield 3
...:
In [9]: iterator = fun()
In [10]: next(iterator)
Out[10]: 1
In [11]: next(iterator)
Out[11]: 2
In [12]: next(iterator)
Out[12]: 3
In [13]: next(iterator)
---------------------------------------------------------------------------
StopIteration Traceback (most recent call last)
<ipython-input-13-4ce711c44abc> in <module>()
----> 1 next(iterator)
StopIteration:
每次next调用, fun函数只执行四分之一,如果我们拥有多个生成器对象,按照一定规则可控的对他分别调用next,生成器每次的暂停都保存了执行进度和内部状态。如果将这三个生成器理解成协程,那不正是我们熟悉的协程间的切换?
事件循环
所以,我们可以想像,现在有一个循环和一个生成器列表,每次循环,我们都将所有的生成器进行一次调用,所有生成器交替执行。如下:
In [16]: gen_list = [fun(), fun(), fun()]
In [17]: while True:
...: for gen in gen_list:
...: print(next(gen))
...:
1
1
1
2
2
2
3
3
3
---------------------------------------------------------------------------
StopIteration Traceback (most recent call last)
<ipython-input-17-f2c1d557da29> in <module>()
1 while True:
2 for gen in gen_list:
----> 3 print(next(gen))
4
StopIteration:
当然,我们还可以换一种写法,将生成器的每一步都当成是一次调用,把生成器包装成一个Handle对象,每次调用handle对象的call来完成生成器的调用,同时,我们还可以在调用完成后做一些准备来控制下一次调用的时间,将Handle对应放到一个scheduled_list里面:
def fun():
print("step1")
yield
print("step2")
yield
print("step3")
yield
scheduled_list = []
class Handle(object):
def __init__(self, gen):
self.gen = gen
def call(self):
next(self.gen)
scheduled_list.append(self)
def loop(*coroutines):
scheduled_list.extend(Handle(c) for c in coroutines)
while True:
while scheduled_list:
handle = scheduled_list.pop(0)
handle.call()
if __name__ == "__main__":
loop(fun(), fun(), fun())
协程中的阻塞
在有了以上的基础后,我们来分析上面提到的切换规则,什么时候应该切换协程(生成器)?显而易见,当遇到阻塞时,我们才需要切换协程,以避免CPU的浪费。我将阻塞分为了以下三种:
- IO调用,如socket,file,pipe等。
- 人为制造的阻塞,如sleep。
- 非同步调用。
假设,在我们的生成器内有一次socket调用,我们不知道它多久会ready,我们希望不等待它的返回,切换到其它协程运行,等其准备好之后再切换回来,该怎么办?
有同学可能会想到了,将socket注册到epoll上。如下:
import time
import socket
from functools import partial
from select import epoll
poll = epoll()
handlers = dict()
scheduled_list = []
def fun():
print("step1")
sock = socket.socket()
future = Future()
def handler():
future.set_done(sock.recv(1024))
add_handler(sock.fileno(), handler, READ)
yield future
print("step2")
yield
print("step3")
yield
def add_handler(fd, handler, events):
handlers[fd] = handler
poll.register(fd, events)
class Future(object):
def __init__(self):
self.callbacks = []
def add_callback(self, callback):
self.callbacks.append(callback)
def set_done(self, value):
self.value = value
for callback in self.callbacks:
callback()
def get_result(self):
return self.value
class Handle(object):
def __init__(self, gen):
self.gen = gen
def call(self):
yielded = next(self.gen)
if isinstance(yielded, Future):
yielded.add_callback(partial(scheduled_list.append, self))
else:
scheduled_list.append(self)
def loop(*coroutines):
scheduled_list.extend(Handle(c) for c in coroutines)
while True:
default_timeout = 10000
while scheduled_list:
handle = scheduled_list.pop(0)
handle.call()
# 等待描述符可操作
events = poll.poll(default_timeout)
while events:
fd, event = events.popitem()
handlers[fd]()
poll.unregister(fd)
del handlers[fd]
if __name__ == "__main__":
loop(fun(), fun(), fun())
这一步引入一个新的对象Future,他用来代指未来即将发生的调用,通过epoll上注册的事件,触发了它的完成,完成之后执行了将handle对象放回scheduled_list, 可从而切回了协程。
那么,人为制造的阻塞我们怎么切换协程呢?这里,我们又引入了一个新的对象Timeout
import time
import socket
from functools import partial
from select import epoll
poll = epoll()
handlers = dict()
scheduled_list = []
# 创建一个timeout_list
timeout_list = []
def fun():
print("step1")
sock = socket.socket()
future = Future()
def handler():
future.set_done()
add_handler(sock.fileno(), handler, READ)
yield future
print("step2")
yield sleep(3)
print("step3")
yield
def add_handler(fd, handler, events):
handlers[fd] = handler
poll.register(fd, events)
def sleep(sec):
future = Future()
timeout = Timeout(sec, future.set_done)
timeout_list.append(timeout)
return future
class Timeout(object):
def __init__(self, timeout, callback):
self.deadline = time.time() + timeout
self.callback = callback
def call(self):
self.callback(None)
class Future(object):
def __init__(self):
self.callbacks = []
self.value = None
def add_callback(self, callback):
self.callbacks.append(callback)
def set_done(self, value):
self.value = value
for callback in self.callbacks:
callback()
def get_result(self):
return self.value
class Handle(object):
def __init__(self, gen):
self.gen = gen
def call(self):
yielded = next(self.gen)
if isinstance(yielded, Future):
yielded.add_callback(partial(scheduled_list.append, self))
else:
scheduled_list.append(self)
def loop(*coroutines):
scheduled_list.extend(Handle(c) for c in coroutines)
while True:
default_timeout = 10000
deadline = time.time()
for timeout in timeout_list[:]:
if timeout.deadline <= deadline:
timeout.call()
timeout_list.remove(timeout)
while scheduled_list:
handle = scheduled_list.pop(0)
handle.call()
for timeout in timeout_list:
wait_time = timeout.deadline - deadline
if wait_time <= 0:
wait_time = 0
default_timeout = min(default_timeout, wait_time)
if not scheduled_list and not timeout_list and not handlers:
break
# 等待描述符可操作
events = poll.poll(default_timeout)
while events:
fd, event = events.popitem()
handlers[fd]()
poll.unregister(fd)
del handlers[fd]
if __name__ == "__main__":
loop(fun(), fun(), fun())
通过创建一个Timeout对象,我们在deadline时触发了其回调,使Future完成,从而完成了协程的切换。
由以上两点,我们可以大致观察出一个规律,创建Future对象,切出协程,在合适的时机(如socket ready或到达deadline/timeout)让他完成,切入协程,这才是协程切换的关键所在,由此,我们可以使用Future来管理各种非同步调用。
如,我们在python编码时遇到了一个计算密集型的函数,由于python单进程无法利用多核,我们可以创建一个子进程来处理计算,同时关联到一个Future中:
def fun():
print("step1")
sock = socket.socket()
future = Future()
def handler():
future.set_done()
add_handler(sock.fileno(), handler, READ)
yield future
print("step2")
yield sleep(3)
print("step3")
future = Future()
from multiprocessing import Process
Process(target=long_time_call, args=(future, )).start()
yield future
def long_time_call(future):
#...
future.set_done()
当协程执行到第三步时,遇到了长时间运行的函数调用,我们创建了一个Future,关联到一个子进程中,并在子进程完成时设置future完成,在子进程完成之前,父进程已完成协程的切出,将执行权交给其它协程执行。
这个地方遗漏了一个细节,当没有其它协程可以执行时,epoll会被设置成超时时间=10000,因而陷入到长时间的睡眠中,而子进程完成后需要切入协程,但父进程已经被epoll阻塞掉,如何唤醒主进程继续执行该协程呢?业界通用的做法是,创建一个管道,在切出协程时让epoll监听读fd,子进程完成后,往管道中写入一个字元,epoll监听的读fd 马上变成ready,因此epoll解除阻塞,事件循环得以继续执行。
当然,非同步调用不仅仅可以使用子进程,子线程、远程计算框架都可以通过这种方式执行。
讲到这里,大家应该基本明白了一个协程函数是如何工作的了。下表可帮助我们从线程的角度理解协程