使用多個執行緒編寫正確的非同步程式時,需要在 C++ 中謹慎操作。以下將說明有助於避免錯誤的特定模式,以及哪些模式能與 C++ FIDL 繫結和元件執行階段完美整合。
背景
非同步執行階段
async 程式庫會定義用於在 Fuchsia 上啟動非同步作業的介面。會定義不透明的 async_dispatcher_t
類型和相關函式。
這個調度工具介面有幾種實作。常見的做法是 async_loop_t
和其 C++ 包裝函式 async::Loop
。執行非同步工作的程式庫通常不應知道具體實作方式。這些函式會在 async_dispatcher_t*
介面上呼叫函式。
執行緒安全
如有需要,讀取器應熟悉執行緒安全性的術語。如要瞭解 Google C++ 團隊認可的執行緒安全性定義,請參閱「CppCon 2018:Geoffrey Romer 的意思」,
堅持執行緒安全性的程式可避免資料競爭。大範圍,是在作業之間沒有定義順序,直接讀取和寫入相同的資料 (請參閱 C++ 標準的資料競爭精確定義)。這些競爭會導致執行階段產生未定義的行為,因此是錯誤來源。
個別 C++ 類型也會針對執行緒安全性進行分類。參照 abseil 的常見做法解釋:
- 如果並行使用不會導致資料競爭,C++ 物件就是「執行緒安全」。
- 如果任何並行使用可能會導致資料競爭,C++ 物件就是 thread-unsafe。
一種可以使用同步處理基元包裝執行緒不安全的類型,例如互斥鎖讓執行緒安全。這就是所謂的新增外部同步處理功能。這樣做會增加負擔,而且不是所有使用者都會同時使用該類型。因此,程式庫預設不會危害執行緒安全,且使用者必須視需要新增同步處理。這類留言可能如下所示:
// This class is thread-unsafe. Methods require external synchronization.
class SomeUnsafeType { /* ... */ };
非同步程式碼中的執行緒安全性
由於有回呼,因此在非同步程式碼中,確保執行緒安全的重要性會更明顯。請參考以下程式碼片段:
// |CsvParser| asynchronously reads from a file, and parses the contents as
// comma separated values.
class CsvParser {
public:
void Load() {
reader_.AsyncRead([this] (std::string data) {
values_ = Parse(data);
});
}
std::vector<std::string> Parse(const std::string& data);
private:
FileReader reader_;
std::vector<std::string> values_;
};
AsyncRead
會在背景完成工作,然後在工作完成後呼叫指定為回呼函式的 lambda。由於 lambda 會擷取 this
,因此通常稱為「啟動呼叫」:由 CsvParser
執行個體擁有的 reader_
會呼叫擁有者。
讓我們想想如何避免這個回呼與 CsvParser
的刪除之間發生競爭問題。在 CsvParser
中新增 Mutex 無法解決問題,因為 CsvParser
遭到刪除時,Mutex 就會遭到刪除。一種可能規定 CsvParser
必須一律納入參照,但這會產生一個主觀的 API,並以遞迴方式導致 CsvParser
參照的所有內容一併計算。
如果能確保刪除 CsvParser
與叫用回呼之間一律有定義的順序,就不會出現競爭狀況。在 Fuchsia 上,回呼通常是在 async_dispatcher_t
物件上安排 (短期的「調度工具」)。常見的模式是使用單一執行緒調度器:
- 使用
async::Loop
做為調度工具實作。 - 只執行一個執行緒來服務迴圈。
- 請只刪除該執行緒上的升級呼叫目標,並同時取消後續的更新呼叫。例如,在發布至該調度工具的工作中刪除
CsvParser
。
由於同一個執行緒會叫用非同步回呼並刪除執行個體,因此必須在這些作業之間定義順序。
上述模式的常見案例是確保同步存取:物件每次存取 (包括建構和刪除) 都會觀察先前存取的副作用。在其他有關執行緒的文獻中,您可能會看到「同步存取權」一詞一直與鎖定有關,例如在存取物件前進行互斥鎖。在 Fuchsia C++ 中,單獨鎖定鎖定的功能不夠如上所述,而我們使用非同步調度器的屬性來實現同步存取權,這樣使用者程式碼就不必採取鎖定。下一節將詳細說明。
已同步的調度工具
已同步的調度工具是會依序執行張貼工作,且每個工作會觀察先前工作帶來的副作用。
由於處理非同步邏輯的物件可從調度器存取,因此無法從任意執行緒存取該物件,因為調度器可能會同時存取同一個物件,導致資料競爭。事實上,必須一律從與該物件相關聯的單一調度器存取該物件。調度器也必須確保作業之間的排序。我們稱這類調度工具是「同步調度工具」。調度器可以透過兩種方式通過驗證,稱為「已同步處理」synchronized:
支援序列
調度器可能會承諾,調度器上張貼的工作一律以嚴格順序執行。據說這類調度工具支援「序列」:依序執行網域會執行一系列工作,其中一項工作會觀察先前工作的所有副作用,但基礎執行作業可能會從一個執行緒跳到另一個執行緒。
已同步的驅動程式庫程式調度工具 (例如 fdf::SynchronizedDispatcher
) 是支援序列的調度工具範例。請參閱驅動程式執行緒模型。不過,async::Loop
不支援序列,因為使用者可能會多次呼叫 Loop::StartThread
,引入多個競爭,以在該迴圈中執行任務。
保持單一討論串
如果調度器不支援序列,則如果調度工具只由單一執行緒 (例如單一執行緒 async::Loop
) 提供服務,則發布至該調度工具的工作所執行的程式碼就會排序。
總結來說,調度工具支援序列,在此情況下,物件必須用於該序列,或在單一調度工具執行緒上執行程式碼,且必須在該執行緒上使用物件。使用涵蓋建構、刪除和呼叫例項方法,
同步的調度工具是「並行」單位:張貼到同一個同步調度器的工作永遠不會並行執行。張貼到不同同步調度工具的工作可能會並行執行。
檢查已同步的調度工具
async
程式庫提供 BasicLockable
類型 async::synchronization_checker
。每當函式需要同步存取權時,您就可以呼叫 .lock()
或使用 std::lock_guard
鎖定檢查工具。這樣做會檢查透過如此保證的調度工具呼叫函式,而不會實際執行任何鎖定。如果檢查失敗,程式會恐慌。建議透過執行檢查工具,在執行階段檢查執行緒不安全的類型。以下是完整範例:
// |ChannelReader| lets one asynchronously read from a Zircon channel.
//
// ## Thread safety
//
// Instances must be used from a synchronized async dispatcher. See
// https://fuchsia.dev/fuchsia-src/development/languages/c-cpp/thread-safe-async#synchronized-dispatcher
class ChannelReader {
public:
ChannelReader(async_dispatcher_t* dispatcher, zx::channel channel)
: dispatcher_(dispatcher),
checker_(dispatcher),
channel_(std::move(channel)),
wait_(channel_.get(), ZX_CHANNEL_READABLE) {}
~ChannelReader() {
// |lock| explicitly checks that the dispatcher is not calling callbacks
// that use this |ChannelReader| instance in the meantime.
checker_.lock();
}
// Asynchronously wait for the channel to become readable, then read the
// data into a member variable.
void AsyncRead() {
// This guard checks that the |AsyncRead| method is called from a task
// running on a synchronized dispatcher.
std::lock_guard guard(checker_);
data_.clear();
zx_status_t status = wait_.Begin(
// The dispatcher that will perform the waiting.
dispatcher_,
// The async dispatcher will call this callback when the channel is
// ready to be read from. Because this callback captures `this`, we
// must ensure the callback does not race with destroying the
// |ChannelReader| instance. This is accomplished by calling
// `checker_.lock()` in the |ChannelReader| destructor.
[this](async_dispatcher_t* dispatcher, async::WaitBase* wait, zx_status_t status,
const zx_packet_signal_t* signal) {
if (status != ZX_OK) {
return;
}
std::lock_guard guard(checker_);
uint32_t actual;
data_.resize(ZX_CHANNEL_MAX_MSG_BYTES);
status = channel_.read(0, data_.data(), nullptr,
static_cast<uint32_t>(data_.capacity()), 0, &actual, nullptr);
if (status != ZX_OK) {
data_.clear();
return;
}
data_.resize(actual);
});
ZX_ASSERT(status == ZX_OK);
}
std::vector<uint8_t> data() const {
// Here we also verify synchronization, because we want to avoid race
// conditions such as the user calling |AsyncRead| which clears the
// data and calling |data| to get the data at the same time.
std::lock_guard guard(checker_);
return data_;
}
private:
async_dispatcher_t* dispatcher_;
async::synchronization_checker checker_;
zx::channel channel_;
std::vector<uint8_t> data_ __TA_GUARDED(checker_);
async::WaitOnce wait_;
};
zx::channel c1, c2;
ASSERT_EQ(ZX_OK, zx::channel::create(0, &c1, &c2));
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
ChannelReader reader{loop.dispatcher(), std::move(c1)};
ASSERT_EQ(reader.data(), std::vector<uint8_t>{});
const std::vector<uint8_t> kData{1, 2, 3};
ASSERT_EQ(ZX_OK, c2.write(0, kData.data(), static_cast<uint32_t>(kData.size()), nullptr, 0));
// Using |reader| must be synchronized with dispatching asynchronous operations.
// Here, they are synchronized because we perform these one after the other
// from a single thread.
reader.AsyncRead();
loop.RunUntilIdle();
ASSERT_EQ(reader.data(), kData);
// The following is disallowed, and would lead to a panic.
// If the dispatcher is running from a different thread, then we cannot
// ensure that |reader| is not used in the meantime.
//
// std::thread([&] { loop.RunUntilIdle(); }).join();
fidl::Client
是另一個在執行階段檢查同步存取權的類型範例:在非調度工具執行緒中刪除 fidl::Client
會導致恐慌。Fuchsia 程式碼集還有其他 C++ 類別也執行同樣的操作。他們通常會以以下留言加強宣傳效果:
// This class is thread-unsafe. Instances must be used and managed from a
// synchronized dispatcher.
class SomeAsyncType { /* ... */ };
如需使用 FIDL 繫結時情境的具體討論,請參閱 C++ FIDL 執行緒指南。
在刪除期間捨棄回呼
您可能已註意到,在上述 ChannelReader
範例中,傳遞至 wait_.Begin(...)
的回呼必須無聲捨棄,而非在 ChannelReader
刪除時呼叫某些錯誤。的 async::WaitOnce
相關說明文件表示「在超出範圍時自動取消等待」。
在刪除期間,部分 C++ 物件會捨棄已註冊的回呼 (如果尚未呼叫這些回呼)。系統最多會提交一次這類 API。async::Wait
和 async::Task
是這類物件的範例。當回呼參照擁有等候/工作的單一接收器 (例如回呼是向上呼叫) 時,這個樣式就非常實用。這些 API 通常也具有執行緒不安全,且需要上述同步存取權。
相較之下,即使回呼擷取的物件已刪除,async::PostTask
仍可能會呼叫提供的回呼。事實上,除非非同步調度工具關閉,否則 async::PostTask
一律會呼叫所提供的回呼。如果您需要發布可以取消的工作,則可使用 async_patterns::TaskScope
:刪除 async_patterns::TaskScope
會捨棄該範圍內安排的未執行回呼。
其他物件在刪除期間一律只會呼叫一次已註冊的回呼。這些呼叫通常會提供錯誤或狀態,指出取消動作。據說保證「只會傳送一次」。
在使用非同步 API 瞭解取消語意時,應參閱對應說明文件。
如果您需要使用「僅一次」回呼 API,而回呼擷取的物件可能會超出範圍,請考慮在物件中使用 async_patterns::Receiver
。Receiver
允許執行緒不安全的非同步物件在離開範圍時,以無訊息方式取消導向至它的回呼。
使用屬於不同同步調度工具的物件
為了維持同步存取權,您可以在同一個同步的調度工具上管理及使用一組物件。這些物件可以同步呼叫另一個物件,而不會破壞同步檢查。比如,應用程式以單一執行緒在單一 async::Loop
上執行所有內容,通常稱為「主執行緒」。
較複雜的應用程式可能會有多個同步的調度工具。如果必須從對應的同步調度工具使用個別物件,會發生下列問題:如果某個物件與不同調度器相關聯,該物件要如何呼叫另一個物件?
一種經過時間測試的方法,是讓物件在彼此之間傳送訊息,而不是同步呼叫執行個體方法。具體而言,這可能表示如果物件 A
需要對物件 B
執行某項操作,A
就會發布非同步工作至 B
的調度工具。工作 (通常是 lambda 函式) 可能會同步使用 B
,因為此工作已在 B
的調度工具下執行,且會與使用 B
的其他工作保持同步。
將工作發布至其他調度工具時,當接收器物件超出範圍時,您就較難安全地捨棄工作。以下是一些做法:
- 您可以用
async_patterns::DispatcherBound
擁有並呼叫位於不同同步調度器上的子項物件。 - 可以使用
async_patterns::Receiver
,讓其他物件對其物件發出呼叫,而不用強制具有擁有權關係。如果接收器遭到刪除,則呼叫會在不顯示通知的情況下取消。 - 其中一個可能會參照物件計數,並將弱指標傳送至已發布的工作。如果指標已過期,已張貼的工作應不會執行任何動作。
Golang 是相當熱門的範例,在其語言設計中納入這項原則。
既有藝術
這類輕量機制就是一個週期性的主題,可確保一組工作依序執行,不必啟動作業系統執行緒:
- Chromium 專案定義類似的序列概念:Chrome 的執行緒和工作。
- Java Platform 已新增虛擬執行緒。