利用多個執行緒編寫正確的非同步程式時,請務必留意 C++。下面說明避免錯誤的特定模式,以及 能與 C++ FIDL 繫結和元件執行階段完美整合。
背景
非同步執行階段
async 程式庫會定義介面
,在 Fuchsia 上啟動非同步作業。定義了不透明
async_dispatcher_t
類型和相關聯的函式。
這個調度工具介面有數種實作。熱門
為 async_loop_t
及其 C++ 包裝函式
async::Loop
。執行非同步工作的程式庫
通常不會知道具體實作內容。而是
會透過 async_dispatcher_t*
介面呼叫函式。
執行緒安全
讀者應熟悉 執行緒安全性。請參閱 CppCon 2018: Geoffrey Romer 「什麼是「執行緒安全」?」,瞭解執行緒安全性的定義, 獲得 Google C++ 團隊背書。
只要能維持執行緒安全性的程式,就能避免發生資料競爭:廣泛、讀取和 這些作業之間沒有指定順序,就寫入相同資料 (請參閱 C++ 標準中資料競爭的精確定義)。這些選舉結果 是錯誤的來源,因為它們會在執行階段中帶來未定義的行為。
個別 C++ 型別也會以執行緒安全為主。轉介 「抽象」的常見實踐解釋:
- 如果並行使用不會導致資料競爭,C++ 物件為執行緒安全狀態。
- 如果任何並行使用都可能導致資料競爭,C++ 物件就會是 thread-unsafe 物件。
其中一個可能會使用同步基元包裝不受執行緒安全的類型,例如互斥鎖 才能確保執行緒安全這就是所謂的新增外部同步處理程序。執行 所以會增加額外負荷,但不是所有使用者都會同時使用該類型。因此 根據預設,程式庫的常見行為對執行緒不安全,而且需要使用者自行新增 這類類型可能包含以下註解:
// This class is thread-unsafe. Methods require external synchronization.
class SomeUnsafeType { /* ... */ };
非同步程式碼中的執行緒安全性
由於 是否有回呼。請考慮以下程式碼片段:
// |CsvParser| asynchronously reads from a file, and parses the contents as
// comma separated values.
class CsvParser {
public:
void Load() {
reader_.AsyncRead([this] (std::string data) {
values_ = Parse(data);
});
}
std::vector<std::string> Parse(const std::string& data);
private:
FileReader reader_;
std::vector<std::string> values_;
};
AsyncRead
會在背景完成作業,然後呼叫 lambda
作業完成後就會指定為回呼函式。由於 lambda
擷取 this
,通常稱為「向上呼叫」:這個 reader_
由 CsvParser
的執行個體所擁有,即可呼叫擁有者。
現在我們來想想如何避免在這次回呼與刪除
CsvParser
。在 CsvParser
中加入 Mutex 並不會有幫助,因為 Mutex
如果 CsvParser
遭到刪除,則會遭到刪除。可能要求 CsvParser
必須
一律計入參考價值,不過這樣會產生明確的 API 並
遞迴讓 CsvParser
參照的所有項目也都能參照
計數。
如果能確定刪除容器之間
CsvParser
以及回呼的叫用,則競爭狀況為
會避開。在 Fuchsia 中,回呼通常是在
async_dispatcher_t
物件 (簡稱為調度器)。常見模式是
使用單一執行緒調度器:
- 使用
async::Loop
做為調度工具實作。 - 僅執行一個執行緒來處理迴圈。
- 只刪除該執行緒上的向上呼叫目標,並取消之後的向上呼叫
例如,在發布至該目錄的工作中刪除
CsvParser
調度器。
由於相同的執行緒會叫用非同步回呼並刪除執行個體, 這些作業之間必須有指定的順序
上述模式的常見情況是確保同步存取權: 物件存取 (包括建構和刪除) 時,就會觀察 對先前存取行為的副作用。在其他有關執行緒的文獻中,您可以 並查看「同步存取權」一詞總是與鎖定有關。例如, 先花 Mutex 鎖定再存取物件在 Fuchsia C++ 中單獨鎖定 光是這樣就不夠充分,因此我們使用非同步屬性。 調度工具 才能進行同步存取權,如此一來,使用者程式碼就不需要 請鎖定。下一節將詳細說明。
同步調度工具
同步處理的調度器是依序執行張貼工作的地方,而每個 工作會觀察先前工作中的副作用。
由於處理非同步邏輯的物件是從調度器存取, 也因為調度工具 可能會同時存取同一個物件,導致資料競爭。於 事實上,您每次必須從 物件調度器也必須確保作業之間的排序。三 呼叫這類調度工具同步調度工具。有兩種方法 調度器符合同步資格:
支援序列
調度員可能會保證張貼在調度器上的工作,一律須採用 以便按照更嚴格的順序進行排序這類調度器表示能支援「序列」:依序 執行網域,以執行一系列任務,其中一項工作會觀察所有任務 但基礎執行作業可能會躍升 其實也很輕鬆
已同步的驅動程式庫調度工具(例如
fdf::SynchronizedDispatcher
) 是調度工具範例
一個支援序列的介面請參閱「驅動程式執行緒模型」。啟用
另一方面,async::Loop
不支援序列,因為使用者可能會呼叫
Loop::StartThread
多次引入多個執行緒
並在這個迴圈中執行工作
保持單一對話串
如果調度工具不支援序列,則程式碼會在
張貼到該調度員的工作時,如果調度員只能使用
單一執行緒,例如單一執行緒 async::Loop
。
總而言之,調度器支援 必須由該序列使用,或者程式碼在單一調度器執行緒上執行 該物件必須在該執行緒上使用使用範圍涵蓋施工 刪除及呼叫執行個體方法
同步調度工具是「並行」的單位:張貼的工作 同一個同步調度工具絕不會同時執行 另一個例子。發布至不同同步調度工具的工作可能會執行 彼此並行
檢查已完成同步的調度工具
async
程式庫提供 BasicLockable
類型。
async::synchronization_checker
。你可以打電話
.lock()
,或在每次函式使用 std::lock_guard
時鎖定檢查工具
需要同步存取權。這麼做會檢查函式呼叫來源
這項保證的調度工具,實際上沒有受到任何鎖定。如果
檢查失敗,程式會發生恐慌。建議您執行緒不安全的類型
執行檢查工具,在執行階段檢查同步處理情形。以下是
範例:
// |ChannelReader| lets one asynchronously read from a Zircon channel.
//
// ## Thread safety
//
// Instances must be used from a synchronized async dispatcher. See
// https://fuchsia.dev/fuchsia-src/development/languages/c-cpp/thread-safe-async#synchronized-dispatcher
class ChannelReader {
public:
ChannelReader(async_dispatcher_t* dispatcher, zx::channel channel)
: dispatcher_(dispatcher),
checker_(dispatcher),
channel_(std::move(channel)),
wait_(channel_.get(), ZX_CHANNEL_READABLE) {}
~ChannelReader() {
// |lock| explicitly checks that the dispatcher is not calling callbacks
// that use this |ChannelReader| instance in the meantime.
checker_.lock();
}
// Asynchronously wait for the channel to become readable, then read the
// data into a member variable.
void AsyncRead() {
// This guard checks that the |AsyncRead| method is called from a task
// running on a synchronized dispatcher.
std::lock_guard guard(checker_);
data_.clear();
zx_status_t status = wait_.Begin(
// The dispatcher that will perform the waiting.
dispatcher_,
// The async dispatcher will call this callback when the channel is
// ready to be read from. Because this callback captures `this`, we
// must ensure the callback does not race with destroying the
// |ChannelReader| instance. This is accomplished by calling
// `checker_.lock()` in the |ChannelReader| destructor.
[this](async_dispatcher_t* dispatcher, async::WaitBase* wait, zx_status_t status,
const zx_packet_signal_t* signal) {
if (status != ZX_OK) {
return;
}
std::lock_guard guard(checker_);
uint32_t actual;
data_.resize(ZX_CHANNEL_MAX_MSG_BYTES);
status = channel_.read(0, data_.data(), nullptr,
static_cast<uint32_t>(data_.capacity()), 0, &actual, nullptr);
if (status != ZX_OK) {
data_.clear();
return;
}
data_.resize(actual);
});
ZX_ASSERT(status == ZX_OK);
}
std::vector<uint8_t> data() const {
// Here we also verify synchronization, because we want to avoid race
// conditions such as the user calling |AsyncRead| which clears the
// data and calling |data| to get the data at the same time.
std::lock_guard guard(checker_);
return data_;
}
private:
async_dispatcher_t* dispatcher_;
async::synchronization_checker checker_;
zx::channel channel_;
std::vector<uint8_t> data_ __TA_GUARDED(checker_);
async::WaitOnce wait_;
};
zx::channel c1, c2;
ASSERT_EQ(ZX_OK, zx::channel::create(0, &c1, &c2));
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
ChannelReader reader{loop.dispatcher(), std::move(c1)};
ASSERT_EQ(reader.data(), std::vector<uint8_t>{});
const std::vector<uint8_t> kData{1, 2, 3};
ASSERT_EQ(ZX_OK, c2.write(0, kData.data(), static_cast<uint32_t>(kData.size()), nullptr, 0));
// Using |reader| must be synchronized with dispatching asynchronous operations.
// Here, they are synchronized because we perform these one after the other
// from a single thread.
reader.AsyncRead();
loop.RunUntilIdle();
ASSERT_EQ(reader.data(), kData);
// The following is disallowed, and would lead to a panic.
// If the dispatcher is running from a different thread, then we cannot
// ensure that |reader| is not used in the meantime.
//
// std::thread([&] { loop.RunUntilIdle(); }).join();
另一個類型範例會檢查 fidl::Client
的同步處理存取權
執行階段:在非調度器執行緒上刪除 fidl::Client
,會導致
恐慌。Fuchsia 程式碼集中還有其他 C++ 類別也有相同的用途。
他們通常會透過註解來強調這一點,例如:
// This class is thread-unsafe. Instances must be used and managed from a
// synchronized dispatcher.
class SomeAsyncType { /* ... */ };
請參閱 C++ FIDL 執行緒指南中的 即使用 FIDL 繫結來建立模型
在刪除期間捨棄回呼
您可能已註意到,以上述 ChannelReader
範例來說,
傳遞至 wait_.Begin(...)
的回呼必須在通知的情況下遭到捨棄,
呼叫失敗,但當 ChannelReader
遭到刪除。的確是
async::WaitOnce
的 說明文件 中提到「自動」
會取消等候範圍而已。」
在刪除期間,如果發生下列情況,某些 C++ 物件會捨棄已註冊的回呼:
尚未呼叫的容器這類 API 可確保
提交一次。async::Wait
和 async::Task
是這類物件的範例。
當回呼參照擁有的單一接收器時,這個樣式成效良好
等待/工作,也就是回呼是指向上呼叫。這些 API 通常也會
不安全的執行緒,並且需要上述「同步存取權」。
相反地,即使 async::PostTask
也可能呼叫提供的回呼,即使
回呼物件則會遭到刪除。事實上,async::PostTask
會
除非非同步調度器已關閉,否則一律呼叫提供的回呼。
如需張貼訊息,請使用 async_patterns::TaskScope
可以取消的工作:刪除
async_patterns::TaskScope
會捨棄未執行的回呼
取得授權
其他物件一律只會呼叫一次已註冊的回呼,即使 刪除映像檔這些呼叫通常會提供錯誤或狀態 表示取消。換言之,保證「僅放送一次」。
如果使用 非同步 API 瞭解取消語意
如果您需要在物件時使用「僅使用一次」回呼 API
回呼所擷取到的範圍外,請考慮使用
async_patterns::Receiver
。Receiver
可讓您
對執行緒不安全的非同步物件,以無訊息方式取消導向的回呼
即使超出範圍
使用屬於其他同步調度工具的物件
為了維持同步存取權,使用者可在 上管理和使用一組物件
相同的同步調度工具這些物件可以同步呼叫
而且不會破壞同步處理檢查。有一個特殊情況是
在單一 async::Loop
上執行所有內容,且使用單一
通常稱為「主執行緒」
如果是較複雜的應用程式,可能就有多個同步調度工具。時間 個別物件必須從對應的已同步項目中使用 調度器就會產生一些問題: 物件該如何呼叫其他物件 是否有相關聯的調度?
這種限時測試的做法
是讓物件在彼此之間傳送訊息
而不是同步呼叫各自的例項方法真的
可能表示如果物件 A
需要對 B
物件執行操作,A
就會
將非同步工作發布至 B
的調度工具。任務 (通常是 lambda
函式) 可能會同步使用 B
,因為該函式已於下方
B
的調度工具,且將與其他使用 B
的工作同步處理。
將工作發布至其他調度工具時,會較難安全捨棄 這類 Pod 會在接收端物件超出範圍時發出以下提供一些方法:
- 其中一個可能同時使用
async_patterns::DispatcherBound
和 並呼叫儲存在不同同步項目的子項物件 調度器。 - 之一可以使用
async_patterns::Receiver
讓其他物件 呼叫物件,而不必強制建立擁有權關係。呼叫 如果接收器遭刪除,則不會發出通知。 - 一個可以參照物件計數,並將較弱的指標傳送至已張貼的 工作。如果指標已過期,則發布的工作應不執行任何動作。
Golang 是熱門例子,運用這項原則 語言設計。
過往藝術
這種輕量機制,可確保一組工作依序執行 不必啟動作業系統執行緒,是一種反覆執行的主題:
- Chromium 專案定義了類似的序列概念:執行緒和 Chrome 執行工作
- Java Platform 新增了虛擬執行緒。