线程安全异步代码

使用多个线程编写正确的异步程序需要在 C++ 中谨慎。在这里,我们将介绍一种有助于避免错误的特定模式,并且该模式可与 C++ FIDL 绑定和组件运行时完美集成。

背景

异步运行时

async 库定义了用于在 Fuchsia 上启动异步操作的接口。它定义了不透明的 async_dispatcher_t 类型及相关函数。

此调度程序接口有几个实现。一种常用的是 async_loop_t 及其 C++ 封装容器 async::Loop。执行异步工作的库通常应该不知道具体实现是什么。而是通过 async_dispatcher_t* 接口调用函数。

线程安全

如有必要,读者应熟悉与线程安全相关的术语。如需了解获得 Google C++ 团队认可的线程安全定义,请参阅 CppCon 2018:Geoffrey Romer“什么是‘线程安全’?”

一个遵循线程安全性的程序可以避免数据争用:大体上讲,读取和写入相同的数据无需在这些操作之间定义顺序(请参阅 C++ 标准中数据争用的精确定义)。这些竞态是错误的来源,因为它们在运行时会导致未定义的行为。

单个 C++ 类型还包含线程安全性方面的分类。引用 abseil 中的常见做法解释:

  • 如果并发使用不会导致数据争用,则 C++ 对象就是线程安全对象。
  • 如有任何并发使用可能会导致数据争用,则 C++ 对象属于线程不安全对象。

可以使用同步基元(例如互斥锁)封装线程不安全类型,使其具有线程安全性。此过程称为添加外部同步。这样做会增加开销,而且并非所有用户都会同时使用该类型。因此,默认情况下,库属于线程安全性问题,并且会根据需要要求用户添加同步。此类类型可能包含如下注释:

// This class is thread-unsafe. Methods require external synchronization.
class SomeUnsafeType { /* ... */ };

异步代码中的线程安全

由于存在回调,异步代码中的线程安全实现变得更加细微。请参考以下代码段:

// |CsvParser| asynchronously reads from a file, and parses the contents as
// comma separated values.
class CsvParser {
 public:
  void Load() {
    reader_.AsyncRead([this] (std::string data) {
      values_ = Parse(data);
    });
  }

  std::vector<std::string> Parse(const std::string& data);

 private:
  FileReader reader_;
  std::vector<std::string> values_;
};

AsyncRead 将在后台完成工作,然后在工作完成时调用指定为回调函数的 lambda。由于 lambda 会捕获 this,因此通常称为“向上调用”:CsvParser 实例拥有的 reader_ 会调用所有者。

我们来考虑一下如何避免此回调与销毁 CsvParser 之间出现争用情况。在 CsvParser 中添加互斥量不会有所帮助,因为如果 CsvParser 销毁,该互斥锁将被销毁。您可能要求 CsvParser 必须始终进行引用计数,但这样会产生主观的 API,并且往往以递归方式导致 CsvParser 引用的所有内容也被计数。

如果我们确保在销毁 CsvParser 和调用回调之间始终存在已定义的排序,便可避免竞态条件。在 Fuchsia 上,回调通常安排在 async_dispatcher_t 对象(简称“调度程序”)上。一种常见的模式是使用单线程调度程序:

  • 使用 async::Loop 作为调度程序实现。
  • 仅运行一个线程来处理循环。
  • 仅销毁该线程上的向上调用目标,并同时取消未来的向上调用。例如,在发布到该调度程序的任务中销毁 CsvParser

由于同一线程会调用异步回调并销毁实例,因此这些操作之间必须有已定义的顺序。

上述模式的一般情况是要确保同步访问:对对象的每次访问(包括构造和销毁)都会观察先前访问的附带效应。在其他有关线程的文献中,您可能会看到“同步访问”一词始终与锁定相关联,例如在访问对象之前执行了互斥锁。如上所述,在 Fuchsia C++ 中,仅使用锁是不够的,我们使用异步调度程序的属性来实现同步访问,这样用户代码就不必使用锁了。下一部分将详细介绍。

同步调度程序

同步调度程序是按顺序运行已发布的任务的调度程序,每个任务都会观察之前任务的副作用。

由于处理异步逻辑的对象是通过调度程序访问,因此也无法从任意线程访问该对象,因为调度程序可能会并发访问同一对象,从而导致数据争用。实际上,必须始终通过与该对象关联的单个调度程序访问该对象。调度程序还必须确保操作之间的排序。我们将此类调度程序称为“同步调度程序”。调度程序可通过两种方法判定为“已同步”:synchronized

支持序列

调度程序可以承诺发布到该调度程序上的任务始终以严格的顺序运行。据说此类调度程序支持序列执行域,即运行一系列任务的顺序执行域,其中一个任务会观察到先前任务的所有附带效应,但底层执行任务可能会从一个线程跳到另一个线程。

同步驱动程序调度程序(例如 fdf::SynchronizedDispatcher)就是一个支持序列的调度程序。请参阅驱动程序线程模型。另一方面,async::Loop 不支持序列,因为用户可能会多次调用 Loop::StartThread 以引入多个线程来争用该循环中的任务执行。

保持单会话

如果调度程序不支持序列,则对于发布到该调度程序的任务上运行的代码,如果该调度程序仅由单个线程提供服务(例如,单线程 async::Loop),则会对运行的代码进行排序。

总而言之,调度程序支持序列,在这种情况下,对象必须在该序列上使用,或者代码在单个调度程序线程上运行,并且对象必须在该线程上使用。用途涵盖构造、销毁和调用实例方法。

同步调度程序是一个并发单元:发布到同一同步调度程序的任务绝不会同时运行。发布到不同同步调度程序的任务可能会彼此并行运行。

检查已同步的调度程序

async 库提供 BasicLockable 类型 async::synchronization_checker。只要函数需要同步访问,您就可以调用 .lock() 或使用 std::lock_guard 锁定检查工具。这样做可以检查函数是否从具有此保证的调度程序调用,而不会实际接受任何锁。如果检查失败,程序将 panic。建议对线程不安全类型使用检查工具检查在运行时是否同步。完整示例如下:

// |ChannelReader| lets one asynchronously read from a Zircon channel.
//
// ## Thread safety
//
// Instances must be used from a synchronized async dispatcher. See
// https://fuchsia.dev/fuchsia-src/development/languages/c-cpp/thread-safe-async#synchronized-dispatcher
class ChannelReader {
 public:
  ChannelReader(async_dispatcher_t* dispatcher, zx::channel channel)
      : dispatcher_(dispatcher),
        checker_(dispatcher),
        channel_(std::move(channel)),
        wait_(channel_.get(), ZX_CHANNEL_READABLE) {}

  ~ChannelReader() {
    // |lock| explicitly checks that the dispatcher is not calling callbacks
    // that use this |ChannelReader| instance in the meantime.
    checker_.lock();
  }

  // Asynchronously wait for the channel to become readable, then read the
  // data into a member variable.
  void AsyncRead() {
    // This guard checks that the |AsyncRead| method is called from a task
    // running on a synchronized dispatcher.
    std::lock_guard guard(checker_);

    data_.clear();

    zx_status_t status = wait_.Begin(
        // The dispatcher that will perform the waiting.
        dispatcher_,

        // The async dispatcher will call this callback when the channel is
        // ready to be read from. Because this callback captures `this`, we
        // must ensure the callback does not race with destroying the
        // |ChannelReader| instance. This is accomplished by calling
        // `checker_.lock()` in the |ChannelReader| destructor.
        [this](async_dispatcher_t* dispatcher, async::WaitBase* wait, zx_status_t status,
               const zx_packet_signal_t* signal) {
          if (status != ZX_OK) {
            return;
          }
          std::lock_guard guard(checker_);

          uint32_t actual;
          data_.resize(ZX_CHANNEL_MAX_MSG_BYTES);
          status = channel_.read(0, data_.data(), nullptr,
                                 static_cast<uint32_t>(data_.capacity()), 0, &actual, nullptr);
          if (status != ZX_OK) {
            data_.clear();
            return;
          }
          data_.resize(actual);
        });
    ZX_ASSERT(status == ZX_OK);
  }

  std::vector<uint8_t> data() const {
    // Here we also verify synchronization, because we want to avoid race
    // conditions such as the user calling |AsyncRead| which clears the
    // data and calling |data| to get the data at the same time.
    std::lock_guard guard(checker_);

    return data_;
  }

 private:
  async_dispatcher_t* dispatcher_;
  async::synchronization_checker checker_;
  zx::channel channel_;
  std::vector<uint8_t> data_ __TA_GUARDED(checker_);
  async::WaitOnce wait_;
};

zx::channel c1, c2;
ASSERT_EQ(ZX_OK, zx::channel::create(0, &c1, &c2));

async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
ChannelReader reader{loop.dispatcher(), std::move(c1)};

ASSERT_EQ(reader.data(), std::vector<uint8_t>{});

const std::vector<uint8_t> kData{1, 2, 3};
ASSERT_EQ(ZX_OK, c2.write(0, kData.data(), static_cast<uint32_t>(kData.size()), nullptr, 0));

// Using |reader| must be synchronized with dispatching asynchronous operations.
// Here, they are synchronized because we perform these one after the other
// from a single thread.
reader.AsyncRead();
loop.RunUntilIdle();

ASSERT_EQ(reader.data(), kData);

// The following is disallowed, and would lead to a panic.
// If the dispatcher is running from a different thread, then we cannot
// ensure that |reader| is not used in the meantime.
//
// std::thread([&] { loop.RunUntilIdle(); }).join();

fidl::Client 是在运行时检查是否同步访问的另一个类型示例:在非调度程序线程上销毁 fidl::Client 会导致 panic。Fuchsia 代码库中还有其他执行相同操作的 C++ 类。他们通常会通过评论强调这一点,例如:

// This class is thread-unsafe. Instances must be used and managed from a
// synchronized dispatcher.
class SomeAsyncType { /* ... */ };

如需查看有关使用 FIDL 绑定时这种情况的具体讨论,请参阅 C++ FIDL 线程指南

在销毁期间舍弃回调

您可能已经注意到,为了让上述 ChannelReader 示例能够正常运行,当 ChannelReader 被销毁时,必须静默舍弃传递给 wait_.Begin(...) 的回调,而不是调用并返回一些错误。事实上,async::WaitOnce文档中提到过,它会“在超出范围时自动取消等待”。

在销毁期间,某些 C++ 对象会舍弃已注册的回调(如果尚未调用)。据说这些类型的 API 可以保证最多一次传送async::Waitasync::Task 是此类对象的示例。当回调引用拥有等待/任务的单个接收器时(即回调是向上调用),此样式非常实用。这些 API 通常也是线程不安全的,需要前述同步访问

相比之下,即使由回调捕获的对象被销毁,async::PostTask 也可能会调用提供的回调。实际上,除非异步调度程序关闭,否则 async::PostTask 将始终调用所提供的回调。如果您需要发布可以取消的任务,则可以使用 async_patterns::TaskScope:销毁 async_patterns::TaskScope 会舍弃在该作用域上调度的未执行回调。

其他对象始终只会调用一次已注册的回调,即使在销毁期间也是如此。这些调用通常会提供表示取消的错误或状态。据说它们保证恰好一次送达。

使用异步 API 时,应查阅相应的文档,以了解取消语义。

如果您需要使用“正好一次”回调 API,而回调捕获的对象可能会超出范围,请考虑在对象中使用 async_patterns::ReceiverReceiver 允许线程不安全异步对象在超出范围时静默取消定向到它的回调。

使用属于其他已同步调度程序的对象

为了保持同步访问,可以在同一个同步调度程序中管理和使用一组对象。这些对象可以同步调用另一个对象,而不会破坏同步检查。一种特殊情况是应用使用单个线程(通常称为“主线程”)在单个 async::Loop 上运行所有内容。

更复杂的应用可能有多个同步调度程序。当必须从其对应的同步调度程序使用单个对象时,就会出现这样一个问题:如果一个对象与不同的调度程序关联,一个对象如何调用另一个对象?

久经考验的方法就是让对象在彼此之间发送消息,而不是同步调用其实例方法。具体而言,这可能意味着,如果对象 A 需要对对象 B 执行某项操作,A 会将异步任务发布到 B 的调度程序。然后,任务(通常是 lambda 函数)可以同步使用 B,因为它已在 B 的调度程序下运行,并将与使用 B 的其他任务同步。

将任务发布到其他调度程序后,如果接收器对象超出范围,安全地舍弃这些任务会更加困难。下面提供了一些方法:

  • 可以使用 async_patterns::DispatcherBound 拥有并调用位于不同同步调度程序上的子对象。
  • 可以使用 async_patterns::Receiver 让其他对象对其对象进行调用,而不会强制建立所有权关系。如果接收器被销毁,调用将以静默方式取消。
  • 可以引用计数对象,并将弱指针传递给已发布的任务。如果指针过期,已发布的任务不应执行任何操作。

Golang 就是一个热门的示例,它将这一原则融入了他们的语言设计。

历史艺术

确保一组任务接连执行(而不必启动操作系统线程)的轻量级机制是一个周期性主题: