本页讨论了异步 Python 的常见任务流程,以及使用异步 Python 时可能遇到的陷阱。
在继续操作之前,请先查看有关 asyncio 的官方文档和有关 Fuchsia 控制器的教程。
背景
以下各项涉及的是一般异步代码,不一定专门针对 Python:
- 异步代码不一定是多线程的。对于 Fuchsia 控制器相关代码,几乎所有内容都是单线程的。
- 异步代码在循环(也称为执行器)内运行,每当发生 await 时,该循环就会让出。
因此,这可能会导致某些事情难以调试,例如,如果您有一个打算永远运行但会引发异常的后台任务。如果您从不等待此任务,则异常仅在日志中可见。
常见误区
未保留对任务的引用
如果您在循环中启动任务,例如:
# Don't do this!
asyncio.get_running_loop().create_task(foo_bar())
并且您没有持有对返回值的引用,那么 Python 垃圾收集器可能会在未来的任意时间点取消并删除此任务。
请务必将您创建的任务存储在变量中,例如:
# Always store a variable!
foo_bar_task = asyncio.get_running_loop().create_task(foo_bar())
然后,确保任务的生命周期与需要它的内容相关联。如果这是一个特定的测试用例,请将任务存储为测试用例类的成员,并在拆解期间取消该任务。
将阻塞代码与异步代码混合使用
在测试代码时,某些实验通常会针对特定事件抛出 sleep 语句。请勿将 asyncio.sleep 与 time.sleep 混淆。前者返回一个协程,而后者会阻塞整个循环。
例如:
import asyncio
import time
async def printer():
""" Prints a thing."""
print("HELLO")
async def main():
task = asyncio.get_running_loop().create_task(printer())
time.sleep(1) # THIS WILL BLOCK EXECUTION.
task.cancel()
if __name__ == "__main__":
asyncio.run(main())
上述代码不会输出任何内容。不过,如果您将 time.sleep(1) 替换为 await asyncio.sleep(1)(会产生异步循环),则会打印 HELLO。
FIDL 服务器错误
鉴于异步编程的性质,您的 FIDL 服务器实现可能会出现运行时错误,而您却不知道,尤其是当您的 FIDL 服务器实现连接到 Fuchsia 设备上的组件时。
遗憾的是,这很难调试,并且可能只会因 FIDL 服务器实现崩溃而在设备上显示为 PEER_CLOSED 错误。
一种方法(用于调试)是向任务添加一个回调,以检查任务中是否发生了异常,并将某些内容发送到日志(甚至使程序硬崩溃)。
例如:
def log_exception(task):
try:
_ = task.result()
except Exception as e:
# Can log the exception here for debugging.
task = asyncio.get_running_loop().create_task(foobar())
task.add_done_callback(log_exception)
常见任务流程
同步任务
首先,建议您查看 Python 文档页面,了解可用的同步对象。请注意,这些对象不是线程安全的。 不过,其中一些值得举例说明。
等待多个任务
您可能需要等待多个任务完成。您可以使用 asyncio.wait 实现此目的。如果您想执行与 select 系统调用类似的操作,请将 return_when 参数设置为 asyncio.FIRST_COMPLETED,例如:
done, pending = await asyncio.wait(
[task1, task2, task3],
return_when=asyncio.FIRST_COMPLETED
)
结果包含一个任务元组(在本例中为 done 和 pending),可以像处理任何其他任务对象一样处理该元组。因此,对于 done 对象,可以遍历这些对象,并通过检查 result() 函数来收集结果。
在同步代码中运行异步代码
在撰写本文时,大多数 Fuchsia 控制器代码都是从同步代码调用的。为了确保任务可以在后台运行,通过 fuchsia_async_extension.get_loop() 保留循环实例可能是有意义的。
这是因为 asyncio 任务必须保持在单个循环中。对于 asyncio.Queue 等同步对象也是如此。
如果您交互的代码足够简单(一次运行一个 FIDL 方法),则可以使用以下方法:
fuchsia_async_extension.get_loop().run_until_complete(...)
但如果您需要运行任务或处理同步,则需要将相关内容封装在一个包含循环的类中。只需确保,如果此代码位于测试或对象中,则有用于关闭循环的拆解代码,例如:
class FidlHandlingClass():
def __init__(self):
self._loop = fuchsia_async_extension.get_loop()
# Can launch tasks here.
self._important_task = self._loop.create_task(foo())
def __del__(self):
self._important_task.close()
self._loop.close()
您还可以使用装饰器扩展该类,以便在所有公共函数中隐式使用该循环(尽管对于调用者而言,这些函数看起来是同步的),例如:
from functools import wraps
class FidlInstance():
def __init__(self, proxy_channel):
self._loop = fuchsia_async_extension.get_loop()
self.echo_proxy = fidl_fuchsia_developer_ffx.EchoClient(proxy_channel)
def __del__(self):
self._loop.close()
def _asyncmethod(f):
@wraps(f)
def wrapped(inst, *args, **kwargs):
coro = f(inst, *args, **kwargs)
inst._loop.run_until_complete(coro)
return wrapped
@_asyncmethod
async def echo(self, string: str):
return await self.echo_proxy.echo_string(value=string)
然后,可以使用以下代码调用上述类:
client_channel = ...
inst = FidlInstance(client_channel)
print(inst.echo("foobar"))
虽然此实例可以在非异步环境中运行,但它仍然运行异步代码。不过,这可以使整个代码更易于读写。
调整代码以支持异步 Python
FIDL 是一种围绕异步性设计的语言。不过,Mobly 等测试框架需要同步 Python 函数才能运行测试。如果您有想要测试的接口,并且该接口主要在同步 Python 中使用,那么本指南适合您。
fuchsia-controller 在其 wrappers 模块下附带了一些 mixin 代码,以支持在同步上下文中使用异步代码,尤其是 AsyncAdapter(可用作封装容器或 mixin)和装饰器 asyncmethod。
当它们一起使用时,会创建一个设置,其中给定实例中运行的所有代码都将针对一个共同的 asyncio 事件循环执行。
在大多数情况下,可以使用 asyncio.run() 在同步 Python 中运行异步 Python。不过,如果您有队列或异步任务需要在函数调用之间检查状态,那么这样做会非常困难。每次调用 asyncio.run() 都会创建一个全新的事件循环,这可能会给管理异步数据结构带来麻烦,因为异步数据结构一次只能在一个事件循环中使用。通过保持单个 asyncio 事件循环,您可以避免遇到这些异常情况。
例如,假设您正在编写 Mobly 测试用例,但您公开的类依赖于一些底层异步 Python,您可以按如下方式编写代码,以便在 Mobly 中使用该类:
import asyncio
from mobly import asserts, base_test, test_runner
from fuchsia_controller_py.wrappers import AsyncAdapter, asyncmethod
# AsyncAdapter should be included first in the list of base classes.
class ClassYouWantToTest(AsyncAdapter, SomeBaseClass):
def __init__(self):
super().__init__()
self.async_init()
@asyncmethod
async def async_init(self):
self.queue: asyncio.Queue[int] = asyncio.Queue()
await self.queue.put(1)
@asyncmethod
async def function_we_care_about(self) -> int:
got = await self.queue.get()
self.queue.task_done()
return got
class ExampleTest(base_test.BaseTestClass):
def test_case_example(self) -> None:
"""Example... doesn't really do much useful."""
c = ClassYouWantToTest()
asserts.assert_equal(c.function_we_care_about(), 1)
if __name__ == "__main__":
test_runner.main()
方法 function_we_care_about 将被封装,以便在执行 Mobly 测试用例时,从功能上执行以下代码:
def function_we_care_about(self) -> None:
coro = self._function_we_care_about_impl()
self._mixin_asyncio_loop.run_until_complete(coro)