asyncio是python从3.5(?)以后引入的非同步协程技术。其特点是,当代码执行到IO请求时,可以将CPU资源出让出去,以便运行其它代码逻辑;待IO完成后,继续之前的代码执行。协程切换与线程切换比较类似,但协程切换更轻,不需要操作系统参与(没有栈切换操作,也没有用户态与内核态切换)。

使用asyncio要注意掌握以下几点:

  1. 通过async def来定义一个协程函数,通过await来执行一个协程对象。协程对象、协程函数的概念如下所示:

async def foo(): # 1. 定义了一个协程函数
pass

async def bar(): # 2. 注意要在函数内部调用协程函数,自身也必须定义为协程
# 3. foo()调用产生了一个协程对象,通过await来执行这个协程。如果不加await,
# 直接以foo()方式调用,则foo中代码并不会执行。
await foo()

2. 一般情况下,无法在一个非协程函数中阻塞地调用另一个协程。如上述注释2所示。但你可以通过asyncio.ensure_future()来非同步执行这个协程,如下所示:

import asyncio
async def foo(): # 1. 定义了一个协程函数
pass

def bar():
asyncio.ensure_future(foo()) # 这里foo()将会在某个时间执行,具体执行顺序未知
print("after/before foo() is executed?")
# 这里是阻塞执行foo(),但这种调用,只能在event loop进入循环之前调用(loop.run_forever()),
# 否则会抛异常
asyncio.get_event_loop().run_until_complete(foo)
print("foo() is executed!")

在一些框架中,会将某些函数定义为协程(即通过async修饰),这些函数都是在某个地方通过create_task,或者ensure_future来进行调度的。

3. 协程可能由于两种原因导致执行中断,其一是它们属于不同的线程,随线程切换而切换;其二是协程执行到了IO调用时,发生切换。因此,协程之间也可能会有资源共享冲突。要防止资源共享冲突产生的数据一致性问题,需要使用asyncio.Lock。asyncio.Lock也遵从上下文管理协议。

4. 一个函数即使没有使用到io,也可以定义成协程。但在调用时,必须通过协程调用语法来完成调用(即通过await foo())。

5. 协程并不是多线程,协程函数在执行中会占用本线程的全部CPU时间,除非遇到IO切换出去。因此,如果你在函数中使用sleep(24*3600),当你分别使用多线程技术和协程技术时,两者的表现是完全不同的。在多线程中,一个线程进入sleep状态,操作系统会切换到其它线程执行,整个程序仍然是可响应的(除了该线程,它必须等待睡眠状态结束);而对协程来说,同一loop中的其它协程都不会得到执行,因为这个sleep会占用本线程的全部执行时间,直到协程执行完毕。

上面的问题引出一个推论,也就是如果一个协程确实需要睡眠(比如某种定时任务),必须使用asyncio.sleep()

6. 如果我们要通过asyncio来远程调用一个服务,应该如何封装呢?假设你使用的底层通讯的API是发送和接收分离的(一般比较靠近底层的API都是这样设计的),那么你会面临这样的问题:当你通过非同步请求(比如send)发出API request后,伺服器的响应可能是通过on_message这样的API来接收的。如何让程序在调用send之后,就能得到(形式上)返回结果,然后根据返回结果继续执行呢?

from typing import Dict

# 全局事件注册表。键为外发请求的track_id,该track_id需要伺服器在响应请求时传回。
# 值为另一个dict,储存著对应的asyncio.Event和网路请求的返回结果。这里也可以使用list。
# 在强调性能的场合下,使用List[event: asyncio.Event, result: object]更好。
_events: Dict[str, Dict] = {}

# 定义阻塞调用的协程
async def sync_call(request):
event = asyncio.Event()
track_id = str(uuid.uuid4())
_events[track_id] = {
"events": event,
"result": None
}

# 发送网路请求,以下仅为示例。具体网路请求要根据业务具体场景来替换。这一步一般是立即返回,
# 伺服器并没有来得及准备好response
await aiohttp.request(...)

# L1: 阻塞地等待事件结果。当框架(或者你的网路常式)收到伺服器返回结果时,根据track_id
# 找到对应的event,触发之
await event.wait()

# 获取结果,并做清理
response = _events[track_id].get("result")
_events.pop(track_id)

return response

# 在框架(或者你的网路常式)的消息接收处,比如on_message函数体中:
async def on_message(response):
# 如果伺服器不传回track_id,则整个机制无法生效
track_id = response.get("track_id")

waited = _events.get(track_id)
if waited:
waited["result"] = response
waited["event"].set() # !这里唤醒在L1处等待执行的代码

推荐阅读:

相关文章