线程安全异步代码

在编写具有多个线程的正确异步程序时需要小心谨慎 C++。下面我们将介绍有助于避免错误的特定模式,以及 能够与 C++ FIDL 绑定和组件运行时完美集成。

背景

异步运行时

async 库定义了接口 用于在 Fuchsia 上启动异步操作。它定义了一个 async_dispatcher_t 类型以及关联的函数。

此调度程序接口有多个实现。热门 是 async_loop_t 及其 C++ 封装容器 async::Loop。执行异步工作的库 通常不应知道具体实现是什么。相反, 会通过 async_dispatcher_t* 接口调用函数。

线程安全

读者应熟悉 线程安全。请参阅 CppCon 2018:Geoffrey Romer “‘线程安全’是什么意思?” 经过 Google C++ 团队的认可。

一个维护线程安全的程序可避免数据争用:一般来说,读取和 写入相同的数据,但未在这些操作之间定义顺序(请参阅 C++ 标准中对数据争用的精确定义)。这些比赛 都是错误的源头,因为它们会导致运行时出现未定义的行为。

单个 C++ 类型也有围绕线程安全的分类。推荐人 abseil 的常见做法解读:

  • 如果并发使用不会导致数据争用,则 C++ 对象是线程安全的
  • 如果任何并发使用都可能导致数据争用,则 C++ 对象是线程不安全的对象。

例如,您可以使用同步基元封装线程不安全类型,互斥量 使其具备线程安全性这称为添加外部同步。操作 因此会增加开销,并非所有用户都会同时使用该类型。因此, 使库默认为线程不安全,并且要求用户添加 同步。此类类型可能包含如下注释:

// This class is thread-unsafe. Methods require external synchronization.
class SomeUnsafeType { /* ... */ };

异步代码中的线程安全

由于 是否出现回调。请参考以下代码段:

// |CsvParser| asynchronously reads from a file, and parses the contents as
// comma separated values.
class CsvParser {
 public:
  void Load() {
    reader_.AsyncRead([this] (std::string data) {
      values_ = Parse(data);
    });
  }

  std::vector<std::string> Parse(const std::string& data);

 private:
  FileReader reader_;
  std::vector<std::string> values_;
};

AsyncRead 将在后台完成工作,然后调用 lambda 指定为回调函数。由于 lambda 捕获this,通常称为“向上调用”:reader_ 属于 CsvParser 的实例,会调用所有者。

让我们考虑一下如何避免此回调和 CsvParser。在 CsvParser 中添加互斥量不会有所帮助,因为互斥量会导致 在 CsvParser 被销毁时销毁。其中可能会要求CsvParser 始终计算引用数,但这样会导致系统偏向于 API 以递归方式使 CsvParser 引用的所有内容也被引用 。

如果我们确保在销毁 CsvParser 并调用回调函数,则竞态条件会变为 。在 Fuchsia 上,回调通常安排在 async_dispatcher_t 对象(简称为调度程序)。一种常见的模式是 使用单线程调度程序:

  • 使用 async::Loop 作为调度程序实现。
  • 仅运行一个线程来处理循环。
  • 仅销毁该线程上的向上调用目标,并在 。例如,销毁发布到该任务的任务内的 CsvParser 调度程序。

由于同一线程调用异步回调并销毁实例, 这些操作之间必须有定义的顺序

上述模式的一般情况是确保同步访问:每个 对象的访问(包括构建和销毁)将遵循 以及先前访问的副作用。在关于线程处理的其他文献中,您可能 看到术语“同步访问”总是与锁定相关联,例如 在访问对象之前实现互斥锁。在 Fuchsia C++ 中,只使用锁 这还不够,我们使用异步脚本的 调度程序来实现同步访问,这样用户代码就不需要 使用锁。下一部分将进行详细介绍。

同步调度程序

同步调度程序是指按顺序运行已发布的任务, 将观察先前任务的副作用。

由于处理异步逻辑的对象是通过调度程序访问的, 也无法从任意线程访问对象,因为调度程序 可能同时访问同一个对象,从而导致数据争用。在 事实上,必须始终通过与以下单个调度程序相关联的单个调度程序访问该对象: 该对象。调度程序还必须确保操作之间的排序。周三 调用此类调度程序同步调度程序。您可通过两种方式 调度程序来限定为 同步

支持序列

调度程序可以承诺在该调度程序上发布的任务始终以 严格的排序。这类调度程序据说支持 sequences:sequence 执行域,用于运行一系列任务,其中一个任务将观察 但底层执行可能会跳跃 从一个线程到另一个线程。

同步驱动程序调度程序(例如 fdf::SynchronizedDispatcher)就是调度程序的一个示例 模型。请参阅驱动程序线程模型。已开启 另一方面,async::Loop 不支持序列,因为用户可能会调用 Loop::StartThread 来引入多个争用 执行任务。

保持单线程模式

如果调度程序不支持序列, 只有为调度程序提供服务时,对发布到该调度程序的任务进行排序 由单个线程(例如单线程 async::Loop)触发。

总之,调度程序支持序列,在这种情况下,对象 必须对该序列使用,否则代码在单个调度程序线程上运行 并且对象必须在该线程上使用。使用包括施工、 销毁和调用实例方法。

同步调度程序是并发的单位,即已发布的任务 同一个同步调度程序绝不会与一个调度程序同时运行 另一个。发布到不同已同步调度程序的任务有可能会执行 同时进行。

检查是否已同步调度程序

async 库提供 BasicLockable 类型, async::synchronization_checker。您可以拨打 .lock(),或者在每次运行函数时使用 std::lock_guard 锁定检查工具 需要同步访问权限。这样做可以检查该函数是否从 调度程序,无需实际获取任何锁。如果 检查失败,程序会恐慌。建议将线程不安全类型 在运行时检查同步情况。以下是完整的 示例:

// |ChannelReader| lets one asynchronously read from a Zircon channel.
//
// ## Thread safety
//
// Instances must be used from a synchronized async dispatcher. See
// https://fuchsia.dev/fuchsia-src/development/languages/c-cpp/thread-safe-async#synchronized-dispatcher
class ChannelReader {
 public:
  ChannelReader(async_dispatcher_t* dispatcher, zx::channel channel)
      : dispatcher_(dispatcher),
        checker_(dispatcher),
        channel_(std::move(channel)),
        wait_(channel_.get(), ZX_CHANNEL_READABLE) {}

  ~ChannelReader() {
    // |lock| explicitly checks that the dispatcher is not calling callbacks
    // that use this |ChannelReader| instance in the meantime.
    checker_.lock();
  }

  // Asynchronously wait for the channel to become readable, then read the
  // data into a member variable.
  void AsyncRead() {
    // This guard checks that the |AsyncRead| method is called from a task
    // running on a synchronized dispatcher.
    std::lock_guard guard(checker_);

    data_.clear();

    zx_status_t status = wait_.Begin(
        // The dispatcher that will perform the waiting.
        dispatcher_,

        // The async dispatcher will call this callback when the channel is
        // ready to be read from. Because this callback captures `this`, we
        // must ensure the callback does not race with destroying the
        // |ChannelReader| instance. This is accomplished by calling
        // `checker_.lock()` in the |ChannelReader| destructor.
        [this](async_dispatcher_t* dispatcher, async::WaitBase* wait, zx_status_t status,
               const zx_packet_signal_t* signal) {
          if (status != ZX_OK) {
            return;
          }
          std::lock_guard guard(checker_);

          uint32_t actual;
          data_.resize(ZX_CHANNEL_MAX_MSG_BYTES);
          status = channel_.read(0, data_.data(), nullptr,
                                 static_cast<uint32_t>(data_.capacity()), 0, &actual, nullptr);
          if (status != ZX_OK) {
            data_.clear();
            return;
          }
          data_.resize(actual);
        });
    ZX_ASSERT(status == ZX_OK);
  }

  std::vector<uint8_t> data() const {
    // Here we also verify synchronization, because we want to avoid race
    // conditions such as the user calling |AsyncRead| which clears the
    // data and calling |data| to get the data at the same time.
    std::lock_guard guard(checker_);

    return data_;
  }

 private:
  async_dispatcher_t* dispatcher_;
  async::synchronization_checker checker_;
  zx::channel channel_;
  std::vector<uint8_t> data_ __TA_GUARDED(checker_);
  async::WaitOnce wait_;
};

zx::channel c1, c2;
ASSERT_EQ(ZX_OK, zx::channel::create(0, &c1, &c2));

async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
ChannelReader reader{loop.dispatcher(), std::move(c1)};

ASSERT_EQ(reader.data(), std::vector<uint8_t>{});

const std::vector<uint8_t> kData{1, 2, 3};
ASSERT_EQ(ZX_OK, c2.write(0, kData.data(), static_cast<uint32_t>(kData.size()), nullptr, 0));

// Using |reader| must be synchronized with dispatching asynchronous operations.
// Here, they are synchronized because we perform these one after the other
// from a single thread.
reader.AsyncRead();
loop.RunUntilIdle();

ASSERT_EQ(reader.data(), kData);

// The following is disallowed, and would lead to a panic.
// If the dispatcher is running from a different thread, then we cannot
// ensure that |reader| is not used in the meantime.
//
// std::thread([&] { loop.RunUntilIdle(); }).join();

fidl::Client 是检查同步访问路径的另一个类型 运行时:销毁非调度程序线程上的 fidl::Client 会导致 恐慌。Fuchsia 代码库中还有其他 C++ 类也可执行相同的操作。 他们通常会通过评论来突出显示这些信息,如下所示:

// This class is thread-unsafe. Instances must be used and managed from a
// synchronized dispatcher.
class SomeAsyncType { /* ... */ };

如需查看具体讨论,请参阅 C++ FIDL 线程指南, 这种情况。

在销毁期间舍弃回调

您可能已经注意到,为使上面的 ChannelReader 示例有效, 必须静默地舍弃传递给 wait_.Begin(...) 的回调,而不是 如果 ChannelReader 被销毁,则会调用此错误代码但返回一些错误。事实上, async::WaitOnce上的文档中提到 会在超出范围时取消等待”。

在销毁期间,如果存在以下情况,某些 C++ 对象会舍弃已注册的回调: 尚未调用的方法。据说这些类型的 API 最多能保证 。例如,async::Waitasync::Task 就是此类对象。 当回调引用的单个接收器拥有 等待/任务,即回调是向上调用。这些 API 通常也是 线程不安全,并且需要上述同步访问

相比之下,async::PostTask 可能会调用提供的回调,即使 该回调所捕获的对象将被销毁。事实上,async::PostTask会 除非异步调度程序已关闭,否则始终调用提供的回调。 如需发帖,您可以使用async_patterns::TaskScope 销毁一个可取消的任务: async_patterns::TaskScope 将舍弃未执行的回调 已安排在该范围内投放

其他对象将始终恰好调用注册的回调一次,即使 遭到破坏这些调用通常会返回错误或状态 表示取消。据说,它们可以保证正好一次送达

使用 异步 API 来了解取消语义。

如果您需要在对象使用“正好一次”回调 API 时 回调所捕获的数据可能超出范围,请考虑使用 async_patterns::ReceiverReceiver 允许 线程不安全的异步对象,用于以静默方式取消定向的回调 。

使用属于其他同步调度程序的对象

为了保持访问权限同步,可以在 Google Cloud 上管理和使用 同一个同步调度程序。这些对象可以同步调用 另一个实例,而不会中断同步检查。一种特殊情况是 使用单个 async::Loop 运行所有内容的应用 线程,通常称为“主线程”。

更复杂的应用可能有多个同步调度程序。时间 必须从其对应的同步对象中 那么一个对象又该如何调用另一个对象呢? 与不同的调度程序相关联?

一种经过时间测试的方法是让对象在彼此之间发送消息, 而不是同步调用其实例方法。具体而言, 可能意味着,如果对象 A 需要对对象 B 执行某些操作,A 就会 将异步任务发布到 B 的调度程序。任务(通常是 lambda) 函数),便可同步使用 B,因为它已在 B 的调度程序,并且将与使用 B 的其他任务同步。

将任务发布给其他调度员时,安全地舍弃任务更困难 当接收器对象超出范围时,发出通知。以下是一些方法:

  • 一人可以使用 async_patterns::DispatcherBound 同时拥有 并调用位于不同同步 调度程序。
  • 某个可以使用 async_patterns::Receiver 让其他对象 调用其对象,而不强制建立所有权关系。调用 会在接收器被销毁的情况下静默取消。
  • 一个可能会引用对象计数,然后将一个弱指针传递给发布的 任务。如果指针过期,则发布的任务不应执行任何操作。

Golang 是一个广受欢迎的示例,它将这一原则融入了他们的 语言设计。

早期艺术

轻量级机制,用于确保一组任务接连执行; 无需启动操作系统线程)就是一个反复出现的主题: