執行緒安全非同步程式碼

使用多個執行緒編寫正確的非同步程式時,需要在 C++ 中謹慎操作。以下將說明有助於避免錯誤的特定模式,以及哪些模式能與 C++ FIDL 繫結和元件執行階段完美整合。

背景

非同步執行階段

async 程式庫會定義用於在 Fuchsia 上啟動非同步作業的介面。會定義不透明的 async_dispatcher_t 類型和相關函式。

這個調度工具介面有幾種實作。常見的做法是 async_loop_t 和其 C++ 包裝函式 async::Loop。執行非同步工作的程式庫通常不應知道具體實作方式。這些函式會在 async_dispatcher_t* 介面上呼叫函式。

執行緒安全

如有需要,讀取器應熟悉執行緒安全性的術語。如要瞭解 Google C++ 團隊認可的執行緒安全性定義,請參閱「CppCon 2018:Geoffrey Romer 的意思」

堅持執行緒安全性的程式可避免資料競爭。大範圍,是在作業之間沒有定義順序,直接讀取和寫入相同的資料 (請參閱 C++ 標準的資料競爭精確定義)。這些競爭會導致執行階段產生未定義的行為,因此是錯誤來源。

個別 C++ 類型也會針對執行緒安全性進行分類。參照 abseil 的常見做法解釋:

  • 如果並行使用不會導致資料競爭,C++ 物件就是「執行緒安全」
  • 如果任何並行使用可能會導致資料競爭,C++ 物件就是 thread-unsafe

一種可以使用同步處理基元包裝執行緒不安全的類型,例如互斥鎖讓執行緒安全。這就是所謂的新增外部同步處理功能。這樣做會增加負擔,而且不是所有使用者都會同時使用該類型。因此,程式庫預設不會危害執行緒安全,且使用者必須視需要新增同步處理。這類留言可能如下所示:

// This class is thread-unsafe. Methods require external synchronization.
class SomeUnsafeType { /* ... */ };

非同步程式碼中的執行緒安全性

由於有回呼,因此在非同步程式碼中,確保執行緒安全的重要性會更明顯。請參考以下程式碼片段:

// |CsvParser| asynchronously reads from a file, and parses the contents as
// comma separated values.
class CsvParser {
 public:
  void Load() {
    reader_.AsyncRead([this] (std::string data) {
      values_ = Parse(data);
    });
  }

  std::vector<std::string> Parse(const std::string& data);

 private:
  FileReader reader_;
  std::vector<std::string> values_;
};

AsyncRead 會在背景完成工作,然後在工作完成後呼叫指定為回呼函式的 lambda。由於 lambda 會擷取 this,因此通常稱為「啟動呼叫」:由 CsvParser 執行個體擁有的 reader_ 會呼叫擁有者。

讓我們想想如何避免這個回呼與 CsvParser 的刪除之間發生競爭問題。在 CsvParser 中新增 Mutex 無法解決問題,因為 CsvParser 遭到刪除時,Mutex 就會遭到刪除。一種可能規定 CsvParser 必須一律納入參照,但這會產生一個主觀的 API,並以遞迴方式導致 CsvParser 參照的所有內容一併計算。

如果能確保刪除 CsvParser 與叫用回呼之間一律有定義的順序,就不會出現競爭狀況。在 Fuchsia 上,回呼通常是在 async_dispatcher_t 物件上安排 (短期的「調度工具」)。常見的模式是使用單一執行緒調度器:

  • 使用 async::Loop 做為調度工具實作。
  • 只執行一個執行緒來服務迴圈。
  • 請只刪除該執行緒上的升級呼叫目標,並同時取消後續的更新呼叫。例如,在發布至該調度工具的工作中刪除 CsvParser

由於同一個執行緒會叫用非同步回呼並刪除執行個體,因此必須在這些作業之間定義順序。

上述模式的常見案例是確保同步存取:物件每次存取 (包括建構和刪除) 都會觀察先前存取的副作用。在其他有關執行緒的文獻中,您可能會看到「同步存取權」一詞一直與鎖定有關,例如在存取物件前進行互斥鎖。在 Fuchsia C++ 中,單獨鎖定鎖定的功能不夠如上所述,而我們使用非同步調度器的屬性來實現同步存取權,這樣使用者程式碼就不必採取鎖定。下一節將詳細說明。

已同步的調度工具

已同步的調度工具是會依序執行張貼工作,且每個工作會觀察先前工作帶來的副作用。

由於處理非同步邏輯的物件可從調度器存取,因此無法從任意執行緒存取該物件,因為調度器可能會同時存取同一個物件,導致資料競爭。事實上,必須一律從與該物件相關聯的單一調度器存取該物件。調度器也必須確保作業之間的排序。我們稱這類調度工具是「同步調度工具」。調度器可以透過兩種方式通過驗證,稱為「已同步處理」synchronized

支援序列

調度器可能會承諾,調度器上張貼的工作一律以嚴格順序執行。據說這類調度工具支援「序列」:依序執行網域會執行一系列工作,其中一項工作會觀察先前工作的所有副作用,但基礎執行作業可能會從一個執行緒跳到另一個執行緒。

已同步的驅動程式庫程式調度工具 (例如 fdf::SynchronizedDispatcher) 是支援序列的調度工具範例。請參閱驅動程式執行緒模型。不過,async::Loop 不支援序列,因為使用者可能會多次呼叫 Loop::StartThread,引入多個競爭,以在該迴圈中執行任務。

保持單一討論串

如果調度器不支援序列,則如果調度工具只由單一執行緒 (例如單一執行緒 async::Loop) 提供服務,則發布至該調度工具的工作所執行的程式碼就會排序。

總結來說,調度工具支援序列,在此情況下,物件必須用於該序列,或在單一調度工具執行緒上執行程式碼,且必須在該執行緒上使用物件。使用涵蓋建構、刪除和呼叫例項方法,

同步的調度工具是「並行」單位:張貼到同一個同步調度器的工作永遠不會並行執行。張貼到不同同步調度工具的工作可能會並行執行。

檢查已同步的調度工具

async 程式庫提供 BasicLockable 類型 async::synchronization_checker。每當函式需要同步存取權時,您就可以呼叫 .lock() 或使用 std::lock_guard 鎖定檢查工具。這樣做會檢查透過如此保證的調度工具呼叫函式,而不會實際執行任何鎖定。如果檢查失敗,程式會恐慌。建議透過執行檢查工具,在執行階段檢查執行緒不安全的類型。以下是完整範例:

// |ChannelReader| lets one asynchronously read from a Zircon channel.
//
// ## Thread safety
//
// Instances must be used from a synchronized async dispatcher. See
// https://fuchsia.dev/fuchsia-src/development/languages/c-cpp/thread-safe-async#synchronized-dispatcher
class ChannelReader {
 public:
  ChannelReader(async_dispatcher_t* dispatcher, zx::channel channel)
      : dispatcher_(dispatcher),
        checker_(dispatcher),
        channel_(std::move(channel)),
        wait_(channel_.get(), ZX_CHANNEL_READABLE) {}

  ~ChannelReader() {
    // |lock| explicitly checks that the dispatcher is not calling callbacks
    // that use this |ChannelReader| instance in the meantime.
    checker_.lock();
  }

  // Asynchronously wait for the channel to become readable, then read the
  // data into a member variable.
  void AsyncRead() {
    // This guard checks that the |AsyncRead| method is called from a task
    // running on a synchronized dispatcher.
    std::lock_guard guard(checker_);

    data_.clear();

    zx_status_t status = wait_.Begin(
        // The dispatcher that will perform the waiting.
        dispatcher_,

        // The async dispatcher will call this callback when the channel is
        // ready to be read from. Because this callback captures `this`, we
        // must ensure the callback does not race with destroying the
        // |ChannelReader| instance. This is accomplished by calling
        // `checker_.lock()` in the |ChannelReader| destructor.
        [this](async_dispatcher_t* dispatcher, async::WaitBase* wait, zx_status_t status,
               const zx_packet_signal_t* signal) {
          if (status != ZX_OK) {
            return;
          }
          std::lock_guard guard(checker_);

          uint32_t actual;
          data_.resize(ZX_CHANNEL_MAX_MSG_BYTES);
          status = channel_.read(0, data_.data(), nullptr,
                                 static_cast<uint32_t>(data_.capacity()), 0, &actual, nullptr);
          if (status != ZX_OK) {
            data_.clear();
            return;
          }
          data_.resize(actual);
        });
    ZX_ASSERT(status == ZX_OK);
  }

  std::vector<uint8_t> data() const {
    // Here we also verify synchronization, because we want to avoid race
    // conditions such as the user calling |AsyncRead| which clears the
    // data and calling |data| to get the data at the same time.
    std::lock_guard guard(checker_);

    return data_;
  }

 private:
  async_dispatcher_t* dispatcher_;
  async::synchronization_checker checker_;
  zx::channel channel_;
  std::vector<uint8_t> data_ __TA_GUARDED(checker_);
  async::WaitOnce wait_;
};

zx::channel c1, c2;
ASSERT_EQ(ZX_OK, zx::channel::create(0, &c1, &c2));

async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
ChannelReader reader{loop.dispatcher(), std::move(c1)};

ASSERT_EQ(reader.data(), std::vector<uint8_t>{});

const std::vector<uint8_t> kData{1, 2, 3};
ASSERT_EQ(ZX_OK, c2.write(0, kData.data(), static_cast<uint32_t>(kData.size()), nullptr, 0));

// Using |reader| must be synchronized with dispatching asynchronous operations.
// Here, they are synchronized because we perform these one after the other
// from a single thread.
reader.AsyncRead();
loop.RunUntilIdle();

ASSERT_EQ(reader.data(), kData);

// The following is disallowed, and would lead to a panic.
// If the dispatcher is running from a different thread, then we cannot
// ensure that |reader| is not used in the meantime.
//
// std::thread([&] { loop.RunUntilIdle(); }).join();

fidl::Client 是另一個在執行階段檢查同步存取權的類型範例:在非調度工具執行緒中刪除 fidl::Client 會導致恐慌。Fuchsia 程式碼集還有其他 C++ 類別也執行同樣的操作。他們通常會以以下留言加強宣傳效果:

// This class is thread-unsafe. Instances must be used and managed from a
// synchronized dispatcher.
class SomeAsyncType { /* ... */ };

如需使用 FIDL 繫結時情境的具體討論,請參閱 C++ FIDL 執行緒指南

在刪除期間捨棄回呼

您可能已註意到,在上述 ChannelReader 範例中,傳遞至 wait_.Begin(...) 的回呼必須無聲捨棄,而非在 ChannelReader 刪除時呼叫某些錯誤。的 async::WaitOnce 相關說明文件表示「在超出範圍時自動取消等待」。

在刪除期間,部分 C++ 物件會捨棄已註冊的回呼 (如果尚未呼叫這些回呼)。系統最多會提交一次這類 API。async::Waitasync::Task 是這類物件的範例。當回呼參照擁有等候/工作的單一接收器 (例如回呼是向上呼叫) 時,這個樣式就非常實用。這些 API 通常也具有執行緒不安全,且需要上述同步存取權

相較之下,即使回呼擷取的物件已刪除,async::PostTask 仍可能會呼叫提供的回呼。事實上,除非非同步調度工具關閉,否則 async::PostTask 一律會呼叫所提供的回呼。如果您需要發布可以取消的工作,則可使用 async_patterns::TaskScope:刪除 async_patterns::TaskScope 會捨棄該範圍內安排的未執行回呼。

其他物件在刪除期間一律只會呼叫一次已註冊的回呼。這些呼叫通常會提供錯誤或狀態,指出取消動作。據說保證「只會傳送一次」

在使用非同步 API 瞭解取消語意時,應參閱對應說明文件。

如果您需要使用「僅一次」回呼 API,而回呼擷取的物件可能會超出範圍,請考慮在物件中使用 async_patterns::ReceiverReceiver 允許執行緒不安全的非同步物件在離開範圍時,以無訊息方式取消導向至它的回呼。

使用屬於不同同步調度工具的物件

為了維持同步存取權,您可以在同一個同步的調度工具上管理及使用一組物件。這些物件可以同步呼叫另一個物件,而不會破壞同步檢查。比如,應用程式以單一執行緒在單一 async::Loop 上執行所有內容,通常稱為「主執行緒」。

較複雜的應用程式可能會有多個同步的調度工具。如果必須從對應的同步調度工具使用個別物件,會發生下列問題:如果某個物件與不同調度器相關聯,該物件要如何呼叫另一個物件?

一種經過時間測試的方法,是讓物件在彼此之間傳送訊息,而不是同步呼叫執行個體方法。具體而言,這可能表示如果物件 A 需要對物件 B 執行某項操作,A 就會發布非同步工作至 B 的調度工具。工作 (通常是 lambda 函式) 可能會同步使用 B,因為此工作已在 B 的調度工具下執行,且會與使用 B 的其他工作保持同步。

將工作發布至其他調度工具時,當接收器物件超出範圍時,您就較難安全地捨棄工作。以下是一些做法:

  • 您可以用 async_patterns::DispatcherBound 擁有並呼叫位於不同同步調度器上的子項物件。
  • 可以使用 async_patterns::Receiver,讓其他物件對其物件發出呼叫,而不用強制具有擁有權關係。如果接收器遭到刪除,則呼叫會在不顯示通知的情況下取消。
  • 其中一個可能會參照物件計數,並將弱指標傳送至已發布的工作。如果指標已過期,已張貼的工作應不會執行任何動作。

Golang 是相當熱門的範例,在其語言設計中納入這項原則。

既有藝術

這類輕量機制就是一個週期性的主題,可確保一組工作依序執行,不必啟動作業系統執行緒: