執行緒安全非同步程式碼

利用多個執行緒編寫正確的非同步程式時,請務必留意 C++。下面說明避免錯誤的特定模式,以及 能與 C++ FIDL 繫結和元件執行階段完美整合。

背景

非同步執行階段

async 程式庫會定義介面 ,在 Fuchsia 上啟動非同步作業。定義了不透明 async_dispatcher_t 類型和相關聯的函式。

這個調度工具介面有數種實作。熱門 為 async_loop_t 及其 C++ 包裝函式 async::Loop。執行非同步工作的程式庫 通常不會知道具體實作內容。而是 會透過 async_dispatcher_t* 介面呼叫函式。

執行緒安全

讀者應熟悉 執行緒安全性。請參閱 CppCon 2018: Geoffrey Romer 「什麼是「執行緒安全」?」,瞭解執行緒安全性的定義, 獲得 Google C++ 團隊背書。

只要能維持執行緒安全性的程式,就能避免發生資料競爭:廣泛、讀取和 這些作業之間沒有指定順序,就寫入相同資料 (請參閱 C++ 標準中資料競爭的精確定義)。這些選舉結果 是錯誤的來源,因為它們會在執行階段中帶來未定義的行為。

個別 C++ 型別也會以執行緒安全為主。轉介 「抽象」的常見實踐解釋:

  • 如果並行使用不會導致資料競爭,C++ 物件為執行緒安全狀態。
  • 如果任何並行使用都可能導致資料競爭,C++ 物件就會是 thread-unsafe 物件。

其中一個可能會使用同步基元包裝不受執行緒安全的類型,例如互斥鎖 才能確保執行緒安全這就是所謂的新增外部同步處理程序。執行 所以會增加額外負荷,但不是所有使用者都會同時使用該類型。因此 根據預設,程式庫的常見行為對執行緒不安全,而且需要使用者自行新增 這類類型可能包含以下註解:

// This class is thread-unsafe. Methods require external synchronization.
class SomeUnsafeType { /* ... */ };

非同步程式碼中的執行緒安全性

由於 是否有回呼。請考慮以下程式碼片段:

// |CsvParser| asynchronously reads from a file, and parses the contents as
// comma separated values.
class CsvParser {
 public:
  void Load() {
    reader_.AsyncRead([this] (std::string data) {
      values_ = Parse(data);
    });
  }

  std::vector<std::string> Parse(const std::string& data);

 private:
  FileReader reader_;
  std::vector<std::string> values_;
};

AsyncRead 會在背景完成作業,然後呼叫 lambda 作業完成後就會指定為回呼函式。由於 lambda 擷取 this,通常稱為「向上呼叫」:這個 reader_CsvParser 的執行個體所擁有,即可呼叫擁有者。

現在我們來想想如何避免在這次回呼與刪除 CsvParser。在 CsvParser 中加入 Mutex 並不會有幫助,因為 Mutex 如果 CsvParser 遭到刪除,則會遭到刪除。可能要求 CsvParser 必須 一律計入參考價值,不過這樣會產生明確的 API 並 遞迴讓 CsvParser 參照的所有項目也都能參照 計數。

如果能確定刪除容器之間 CsvParser 以及回呼的叫用,則競爭狀況為 會避開。在 Fuchsia 中,回呼通常是在 async_dispatcher_t 物件 (簡稱為調度器)。常見模式是 使用單一執行緒調度器:

  • 使用 async::Loop 做為調度工具實作。
  • 僅執行一個執行緒來處理迴圈。
  • 只刪除該執行緒上的向上呼叫目標,並取消之後的向上呼叫 例如,在發布至該目錄的工作中刪除 CsvParser 調度器。

由於相同的執行緒會叫用非同步回呼並刪除執行個體, 這些作業之間必須有指定的順序

上述模式的常見情況是確保同步存取權: 物件存取 (包括建構和刪除) 時,就會觀察 對先前存取行為的副作用。在其他有關執行緒的文獻中,您可以 並查看「同步存取權」一詞總是與鎖定有關。例如, 先花 Mutex 鎖定再存取物件在 Fuchsia C++ 中單獨鎖定 光是這樣就不夠充分,因此我們使用非同步屬性。 調度工具 才能進行同步存取權,如此一來,使用者程式碼就不需要 請鎖定。下一節將詳細說明。

同步調度工具

同步處理的調度器是依序執行張貼工作的地方,而每個 工作會觀察先前工作中的副作用。

由於處理非同步邏輯的物件是從調度器存取, 也因為調度工具 可能會同時存取同一個物件,導致資料競爭。於 事實上,您每次必須從 物件調度器也必須確保作業之間的排序。三 呼叫這類調度工具同步調度工具。有兩種方法 調度器符合同步資格:

支援序列

調度員可能會保證張貼在調度器上的工作,一律須採用 以便按照更嚴格的順序進行排序這類調度器表示能支援「序列」:依序 執行網域,以執行一系列任務,其中一項工作會觀察所有任務 但基礎執行作業可能會躍升 其實也很輕鬆

已同步的驅動程式庫調度工具(例如 fdf::SynchronizedDispatcher) 是調度工具範例 一個支援序列的介面請參閱「驅動程式執行緒模型」。啟用 另一方面,async::Loop 不支援序列,因為使用者可能會呼叫 Loop::StartThread 多次引入多個執行緒 並在這個迴圈中執行工作

保持單一對話串

如果調度工具不支援序列,則程式碼會在 張貼到該調度員的工作時,如果調度員只能使用 單一執行緒,例如單一執行緒 async::Loop

總而言之,調度器支援 必須由該序列使用,或者程式碼在單一調度器執行緒上執行 該物件必須在該執行緒上使用使用範圍涵蓋施工 刪除及呼叫執行個體方法

同步調度工具是「並行」的單位:張貼的工作 同一個同步調度工具絕不會同時執行 另一個例子。發布至不同同步調度工具的工作可能會執行 彼此並行

檢查已完成同步的調度工具

async 程式庫提供 BasicLockable 類型。 async::synchronization_checker。你可以打電話 .lock(),或在每次函式使用 std::lock_guard 時鎖定檢查工具 需要同步存取權。這麼做會檢查函式呼叫來源 這項保證的調度工具,實際上沒有受到任何鎖定。如果 檢查失敗,程式會發生恐慌。建議您執行緒不安全的類型 執行檢查工具,在執行階段檢查同步處理情形。以下是 範例:

// |ChannelReader| lets one asynchronously read from a Zircon channel.
//
// ## Thread safety
//
// Instances must be used from a synchronized async dispatcher. See
// https://fuchsia.dev/fuchsia-src/development/languages/c-cpp/thread-safe-async#synchronized-dispatcher
class ChannelReader {
 public:
  ChannelReader(async_dispatcher_t* dispatcher, zx::channel channel)
      : dispatcher_(dispatcher),
        checker_(dispatcher),
        channel_(std::move(channel)),
        wait_(channel_.get(), ZX_CHANNEL_READABLE) {}

  ~ChannelReader() {
    // |lock| explicitly checks that the dispatcher is not calling callbacks
    // that use this |ChannelReader| instance in the meantime.
    checker_.lock();
  }

  // Asynchronously wait for the channel to become readable, then read the
  // data into a member variable.
  void AsyncRead() {
    // This guard checks that the |AsyncRead| method is called from a task
    // running on a synchronized dispatcher.
    std::lock_guard guard(checker_);

    data_.clear();

    zx_status_t status = wait_.Begin(
        // The dispatcher that will perform the waiting.
        dispatcher_,

        // The async dispatcher will call this callback when the channel is
        // ready to be read from. Because this callback captures `this`, we
        // must ensure the callback does not race with destroying the
        // |ChannelReader| instance. This is accomplished by calling
        // `checker_.lock()` in the |ChannelReader| destructor.
        [this](async_dispatcher_t* dispatcher, async::WaitBase* wait, zx_status_t status,
               const zx_packet_signal_t* signal) {
          if (status != ZX_OK) {
            return;
          }
          std::lock_guard guard(checker_);

          uint32_t actual;
          data_.resize(ZX_CHANNEL_MAX_MSG_BYTES);
          status = channel_.read(0, data_.data(), nullptr,
                                 static_cast<uint32_t>(data_.capacity()), 0, &actual, nullptr);
          if (status != ZX_OK) {
            data_.clear();
            return;
          }
          data_.resize(actual);
        });
    ZX_ASSERT(status == ZX_OK);
  }

  std::vector<uint8_t> data() const {
    // Here we also verify synchronization, because we want to avoid race
    // conditions such as the user calling |AsyncRead| which clears the
    // data and calling |data| to get the data at the same time.
    std::lock_guard guard(checker_);

    return data_;
  }

 private:
  async_dispatcher_t* dispatcher_;
  async::synchronization_checker checker_;
  zx::channel channel_;
  std::vector<uint8_t> data_ __TA_GUARDED(checker_);
  async::WaitOnce wait_;
};

zx::channel c1, c2;
ASSERT_EQ(ZX_OK, zx::channel::create(0, &c1, &c2));

async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
ChannelReader reader{loop.dispatcher(), std::move(c1)};

ASSERT_EQ(reader.data(), std::vector<uint8_t>{});

const std::vector<uint8_t> kData{1, 2, 3};
ASSERT_EQ(ZX_OK, c2.write(0, kData.data(), static_cast<uint32_t>(kData.size()), nullptr, 0));

// Using |reader| must be synchronized with dispatching asynchronous operations.
// Here, they are synchronized because we perform these one after the other
// from a single thread.
reader.AsyncRead();
loop.RunUntilIdle();

ASSERT_EQ(reader.data(), kData);

// The following is disallowed, and would lead to a panic.
// If the dispatcher is running from a different thread, then we cannot
// ensure that |reader| is not used in the meantime.
//
// std::thread([&] { loop.RunUntilIdle(); }).join();

另一個類型範例會檢查 fidl::Client 的同步處理存取權 執行階段:在非調度器執行緒上刪除 fidl::Client,會導致 恐慌。Fuchsia 程式碼集中還有其他 C++ 類別也有相同的用途。 他們通常會透過註解來強調這一點,例如:

// This class is thread-unsafe. Instances must be used and managed from a
// synchronized dispatcher.
class SomeAsyncType { /* ... */ };

請參閱 C++ FIDL 執行緒指南中的 即使用 FIDL 繫結來建立模型

在刪除期間捨棄回呼

您可能已註意到,以上述 ChannelReader 範例來說, 傳遞至 wait_.Begin(...) 的回呼必須在通知的情況下遭到捨棄, 呼叫失敗,但當 ChannelReader 遭到刪除。的確是 async::WaitOnce說明文件 中提到「自動」 會取消等候範圍而已。」

在刪除期間,如果發生下列情況,某些 C++ 物件會捨棄已註冊的回呼: 尚未呼叫的容器這類 API 可確保 提交一次async::Waitasync::Task 是這類物件的範例。 當回呼參照擁有的單一接收器時,這個樣式成效良好 等待/工作,也就是回呼是指向上呼叫。這些 API 通常也會 不安全的執行緒,並且需要上述「同步存取權」

相反地,即使 async::PostTask 也可能呼叫提供的回呼,即使 回呼物件則會遭到刪除。事實上,async::PostTask會 除非非同步調度器已關閉,否則一律呼叫提供的回呼。 如需張貼訊息,請使用 async_patterns::TaskScope 可以取消的工作:刪除 async_patterns::TaskScope 會捨棄未執行的回呼 取得授權

其他物件一律只會呼叫一次已註冊的回呼,即使 刪除映像檔這些呼叫通常會提供錯誤或狀態 表示取消。換言之,保證「僅放送一次」

如果使用 非同步 API 瞭解取消語意

如果您需要在物件時使用「僅使用一次」回呼 API 回呼所擷取到的範圍外,請考慮使用 async_patterns::ReceiverReceiver 可讓您 對執行緒不安全的非同步物件,以無訊息方式取消導向的回呼 即使超出範圍

使用屬於其他同步調度工具的物件

為了維持同步存取權,使用者可在 上管理和使用一組物件 相同的同步調度工具這些物件可以同步呼叫 而且不會破壞同步處理檢查。有一個特殊情況是 在單一 async::Loop 上執行所有內容,且使用單一 通常稱為「主執行緒」

如果是較複雜的應用程式,可能就有多個同步調度工具。時間 個別物件必須從對應的已同步項目中使用 調度器就會產生一些問題: 物件該如何呼叫其他物件 是否有相關聯的調度?

這種限時測試的做法 是讓物件在彼此之間傳送訊息 而不是同步呼叫各自的例項方法真的 可能表示如果物件 A 需要對 B 物件執行操作,A 就會 將非同步工作發布至 B 的調度工具。任務 (通常是 lambda 函式) 可能會同步使用 B,因為該函式已於下方 B 的調度工具,且將與其他使用 B 的工作同步處理。

將工作發布至其他調度工具時,會較難安全捨棄 這類 Pod 會在接收端物件超出範圍時發出以下提供一些方法:

  • 其中一個可能同時使用 async_patterns::DispatcherBound 和 並呼叫儲存在不同同步項目的子項物件 調度器。
  • 之一可以使用 async_patterns::Receiver 讓其他物件 呼叫物件,而不必強制建立擁有權關係。呼叫 如果接收器遭刪除,則不會發出通知。
  • 一個可以參照物件計數,並將較弱的指標傳送至已張貼的 工作。如果指標已過期,則發布的工作應不執行任何動作。

Golang 是熱門例子,運用這項原則 語言設計。

過往藝術

這種輕量機制,可確保一組工作依序執行 不必啟動作業系統執行緒,是一種反覆執行的主題: