原文链接:https://faculty.ai/tech-blog/a-guide-to-using-asyncio/

前言

-Andrew Crozier, Data Engineer
28 March 2019

在 Faculty,我们在 Scala 中构建了许多我们平台的后端服务。它是一种非常好的编程语言。我发现,函数式编程模型 和 强类型 对于编写经过良好测试的健壮软件非常高效。然而,当我们需要为我们的新job功能编写轻量级的代理时,我们认为运行 JVM 的计算资源的消耗过于昂贵。

于是,我们决定利用团队编写现代 Python 代码的经验,使用最近版本中添加的一些新功能,特别是使用协程和类型提示的异步编程。

在这篇文章中,我将分享我们在 Python 中使用 asyncio 开发异步代码的一些经验。

协程(Coroutines) 和 event loop

在深入研究一些代码示例之前,我想先解释一下使用协程进行异步编程的工作原理。

协程是一种协同工作的并发编程。每个协程都是一条执行线,可以挂起其对程序的控制以允许另一个协程运行。一个非常有用的情况是:一个协程正在等待诸如网络请求之类的操作完成,此时该协程正在等待,但其他有用的工作可以由其他协程完成。

协程编程系统的实现需要某种方式在代码中释放对程序的控制,并在以后恢复(下面我们将介绍 Python 这方面的语法),除此之外,还需要一个系统用于管理活动协程的执行。这通常通过 event loop 来实现。event loop 保持着对活动协程的追踪,当一个协程释放控制权时, event loop 将控制权传递给另一个。

对这些概念的基本了解是很有用的,因为它有助于我们理解正在使用的库的语法和目的。整篇文章中我将多次提到 event loop 和协程。

用 Python 编写协程

Python的3.5版本引入了 async/await 语法。这些关键字允许您声明协程并使用它们。例如,使用 async def 声明一个协程:

async def example():
    print("Example is running")

上面的代码让你声明了一个协程,但要运行它,需要明确地在 event loop 上运行它。如果你直接调用,它只是会返回一个实际上没有运行的coroutine对象:

>>> example()
<coroutine object example at 0x10b2bb4c0>

您可以自由使用任意您喜欢的 event loop 实现,但是标准库提供了 asyncio,这是一个常用的选择,许多第三方工具(例如aiohttp) 都在此基础上构建。使用 asyncio 运行我们的协程时,请获取一个 event loop 并将我们上面得到的coroutine对象传递给它:

>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(example())
Example is running
>>> loop.close()

注意: Python的3.7版本添加了 asyncio.run(),它会创建一个 event loop 并为您运行一个协程。在这种情况下,上面的例子就变得更简单了:asyncio.run(example())。在示例的其余部分中,我将使用asyncio.run(),但如果你使用较旧的 Python 版本,就要调整代码,创建 event loop 并调用run_until_complete()

让出控制

我们已经介绍了编写基本的协程。但是,正如前面所说,只有在它们让出对 event loop 的控制时,才算真正发挥作用,因为这样才可以让其他协程做工作。为此,您需要 await 另一个协程(或 asyncio 提供的其他 awaitable(可等待) 对象):

async def inner():
    print("inner coroutine")

async def example():
    await inner()

在这个例子中,在我们 await 内部协程的地方,让出了对 event loop 的控制,允许执行 event loop 上的其他协程。

如果被调用的协程在完成时返回某些内容,则该 await 语句将在协程完成时返回它。在以下示例中,我们打印了 get_message 返回的结果:

>>> async def get_message():
>>>     return "Coroutines are great"
>>>
>>> async def example():
>>>     message = await get_message()
>>>     print(message)
>>>
>>> asyncio.run(example())
Coroutines are great

一个更完整的例子

让协程对 event loop 放开控制最有帮助的情况是,我们预计将不得不等待一段空闲时间直到可以完成工作。下面我们用asyncio.sleep(它会等待指定的秒数)来模拟这种情况:

import asyncio

async def print_after(message, delay):
    """Print a message after the specified delay (in seconds)"""
    await asyncio.sleep(delay)
    print(message)

async def main():
    # Use asyncio.gather to run two coroutines concurrently:
    await asyncio.gather(
        print_after("world!", 2),
        print_after("Hello", 1)
    )

asyncio.run(main())

运行此示例会打印出:

Hello
world!

协程什么时候开始运行?

使用asyncio实现协程的一个常见的“陷阱”是,有时需要在 event loop 中显式地调度它们。考虑下面的例子,我试图重现与上面使用 asyncio.gather 时相同的行为:

import asyncio

async def print_after(message, delay):
    """Print a message after the specified delay (in seconds)"""
    await asyncio.sleep(delay)
    print(message)

async def main():
    # Start coroutine twice (hopefully they start!)
    first_awaitable = print_after("world!", 2)
    second_awaitable = print_after("Hello", 1)
    # Wait for coroutines to finish
    await first_awaitable
    await second_awaitable

asyncio.run(main())

然而,当运行它时,我得到以下输出。然而我期望 “Hello” 首先被打印:

world!
Hello

修改下示例,在协程执行的开始和结束都打印信息,问题的原因变得清楚了:

import asyncio

async def example(message):
    print("start of example():", message)
    await asyncio.sleep(1)
    print("end of example():", message)

async def main():
    # Start coroutine twice (hopefully they start!)
    first_awaitable = example("First call")
    second_awaitable = example("Second call")
    # Wait for coroutines to finish
    await first_awaitable
    await second_awaitable

asyncio.run(main())

运行上述结果会产生以下输出:

start of example(): First call
end of example(): First call
start of example(): Second call
end of example(): Second call

出现的问题是:asyncio 在第一个调用example()完成之前不会开始执行第二个调用 。

这一切都是因为:只有被明确地注册(例如使用asyncio.run())或者在另一个协程中 await 时,asyncio才会开始执行一个协程。所以,如果我们想启动多个协程并让它们像之前那样并发运行,我们可以像前面的例子那样使用 asyncio.gather(),或者使用asyncio.create_task()单独调度它们 :

import asyncio

async def print_after(message, delay):
    """Print a message after the specified delay (in seconds)"""
    await asyncio.sleep(delay)
    print(message)

async def main():
    # Start coroutine twice (hopefully they start!)
    first_awaitable = asyncio.create_task(print_after("world!", 2))
    second_awaitable = asyncio.create_task(print_after("Hello", 1))
    # Wait for coroutines to finish
    await first_awaitable
    await second_awaitable

asyncio.run(main())

修改后的代码片会立即开始运行两个协程并等待它们完成,从而符合预期地首先打印“Hello”:

Hello
world!

注意: asyncio.create_task() 在 Python 3.7 中引入。在旧版本的 Python 中,请改用 asyncio.ensure_future()

在 asyncio 中运行命令

我们开发的作业代理的主要职责是安装批处理作业的依赖项,然后运行作业本身。这些步骤中的每一个都需要运行可能持续很久的 shell 命令,同时通过 HTTP 服务在计算机上记录它们的输出以及CPU和内存的利用率。

使用基于协程的并发非常适合这种模型,因为对于程序的大部分执行时间,它都在等待其他进程或网络 I/O。因此,我们决定使用 Python 协程和 asyncio 的子进程实现来实现我们的代理。

asyncio 为运行命令提供了一个接口,与 Python 标准库中的 subprocess 非常相似:

import asyncio

async def echo(string):
    process = await asyncio.create_subprocess_exec("echo", string)
    await process.wait()

asyncio.run(echo("Hello, world!"))

进程是使用 asyncio.create_subprocess_exec() 创建的(它本身就是一个协程,因此需要被等待)。进程对象上的一些方法也是协程(如上面例子的.wait())。

使用 aiohttp 的异步 HTTP

我们的工作代理还负责将信息发送回中央跟踪服务器,例如,确定工作的健康状况并允许中央服务在工作变得不健康时采取行动。

发出网络请求是另一个 I/O 操作,非常适合基于协程的并发编程模型。我们使用 aiohttp,一个基于 asyncio 的 HTTP 库,将监控的信息发送回我们跟踪的服务。

使用 aiohttp 发出 HTTP 请求:

import aiohttp
import asyncio

async def fetch_and_print(url):
    async with aiohttp.ClientSession() as session:
        response = await session.get(url)
        print(await response.text())

asyncio.run(fetch_and_print("https://python.org/"))

上面的示例使用 aiohttp.ClientSession 作为 异步上下文管理器(asynchronous context manager)async wait 语法。这与 Python 中的 标准上下文管理器 的工作方式非常相似,只是,控制进入和退出上下文的代码是在协程中实现的,因此也可以异步执行。在这种情况下,将 session 用作上下文管理器可确保在我们完成后将其关闭。

把它们放在一起

使用上面的工具,我们现在可以组合一个简单版本的作业运行代理,使用 asyncio 让所有内容并发运行。

代理将:

  • 向跟踪服务器发送通知以表示它已启动
  • 定期向跟踪服务器发送心跳,以便它知道作业仍然健康
  • 运行作业命令
  • 在作业完成时向跟踪服务器发送通知,指明它是成功还是失败。

我已经把完整的例子 放在了GitHub上,还有一个简单的后端可以用来测试它。agent.py 是一个示例,它在运行命令时使用协程向跟踪服务器发送心跳。

总结

协程是在执行 I/O任务(例如运行系统命令和处理网络请求)的 Python 程序中实现并发的好方法。我已经演示了使用它的一些基础知识,并提供了一个更完整的示例应用程序。但我还是鼓励您亲自尝试一下,看看它如何满足您的需求。

Logo

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。

更多推荐