Writing correct asynchronous programs with multiple threads requires care in C++. Here we describe a particular pattern that helps avoid errors, and which will integrate well with the C++ FIDL bindings and component runtime.
Background
Asynchronous runtimes
The async library defines the interface
for initiating asynchronous operations on Fuchsia. It defines an opaque
async_dispatcher_t
type, and associated functions.
There are several implementations of this dispatcher interface. A popular one
is async_loop_t
and its C++ wrapper
async::Loop
. Libraries that performs asynchronous work
generally should not know what is the concrete implementation. Instead they
would call functions over the async_dispatcher_t*
interface.
Thread safety
The reader should familiarize themselves with the terminology around thread safety if needed. See CppCon 2018: Geoffrey Romer “What do you mean "thread-safe"?” for a definition of thread safety that is endorsed by Google's C++ team.
A program that upholds thread safety avoids data races: broadly, reading and writing the same data without a defined ordering between those operations (see precise definition of a data race in the C++ standard). These races are a source of errors because they lead to undefined behavior at run-time.
An individual C++ type also has categorizations around thread-safety. Referring common practice interpretations from abseil:
- A C++ object is thread-safe if concurrent usages does not cause data races.
- A C++ object is thread-unsafe if any concurrent usage may cause data races.
One may wrap a thread-unsafe type with synchronization primitives e.g. mutexes to make it thread-safe. This is called adding external synchronization. Doing so adds overhead, and not all users will use that type concurrently. Hence it's common for a library to be thread-unsafe by default, and require the user to add synchronization if desired. Such types may have comments like the following:
// This class is thread-unsafe. Methods require external synchronization.
class SomeUnsafeType { /* ... */ };
Thread safety in asynchronous code
Achieving thread safety gets more subtle in asynchronous code due to the presence of callbacks. Consider the following snippet:
// |CsvParser| asynchronously reads from a file, and parses the contents as
// comma separated values.
class CsvParser {
public:
void Load() {
reader_.AsyncRead([this] (std::string data) {
values_ = Parse(data);
});
}
std::vector<std::string> Parse(const std::string& data);
private:
FileReader reader_;
std::vector<std::string> values_;
};
AsyncRead
will complete the work in the background, then call the lambda
specified as the callback function when the work completes. Because the lambda
captures this
, it is commonly referred to as an "upcall": the reader_
that
is owned by an instance of CsvParser
makes a call to the owner.
Let's consider how to avoid races between this callback and the destruction of
CsvParser
. Adding a mutex in CsvParser
won't help, because the mutex would
be destroyed if CsvParser
is destroyed. One may require that CsvParser
must
always be reference counted, but that results in an opinionated API and tends to
recursively cause everything referenced by CsvParser
to also be reference
counted.
If we ensure that there is always a defined ordering between the destruction of
CsvParser
and the invocation of the callback, then the race condition is
avoided. On Fuchsia, the callback is typically scheduled on an
async_dispatcher_t
object (termed dispatcher in short). A common pattern is
to use a single threaded dispatcher:
- Use an
async::Loop
as the dispatcher implementation. - Only run one thread to service the loop.
- Only destroy upcall targets on that thread, and cancel future upcalls at the
same time. For example, destroy the
CsvParser
within a task posted to that dispatcher.
Since the same thread invokes asynchronous callbacks and destroys the instance, there must be a defined ordering between those operations.
The general case of the above pattern is to ensure synchronized access: every access (including construction and destruction) of an object will observe the side-effects of previous accesses. In other literature about threading, you may see the term synchronized access always associated with locking, for example taking a mutex lock before accessing an object. In Fuchsia C++, locks alone would not be sufficient as discussed above, and we use properties of the async dispatcher to achieve synchronized access, such that user code does not have to take locks. The next section will go into detail.
Synchronized dispatchers
A synchronized dispatcher is one where posted tasks are run in order, and each task will observe the side-effects from previous tasks.
Because objects dealing with asynchronous logic are accessed from dispatchers, one cannot also access the object from arbitrary threads, as the dispatcher might be concurrently accessing the same object, resulting in data races. In fact, one must always access the object from a single dispatcher associated with that object. The dispatcher must also ensure ordering between operations. We call such dispatchers synchronized dispatchers. There are two ways for a dispatcher to qualify as synchronized:
Support sequences
A dispatcher may promise that tasks posted on that dispatcher always run with a strict ordering. Such dispatchers are said to support sequences: sequential execution domains which runs a series of tasks where one task will observe all side-effects from previous tasks, but where the underlying execution may hop from one thread to another.
Synchronized driver dispatchers (e.g.
fdf::SynchronizedDispatcher
) are an example of dispatchers
that support sequences. See driver threading model. On
the other hand, async::Loop
does not support sequences, as the user may call
Loop::StartThread
many times to introduce multiple threads that race to
execute tasks in that loop.
Stay single threaded
If the dispatcher does not support sequences, then code running on
tasks posted to that dispatcher are ordered if that dispatcher is only serviced
by a single thread, for example, a single-threaded async::Loop
.
In summary, either the dispatcher supports sequences in which case the object must be used on that sequence, or the code runs on a single dispatcher thread and the object must be used on that thread. Use covers construction, destruction, and calling instance methods.
Synchronized dispatchers are a unit of concurrency: tasks posted to the same synchronized dispatcher are never run concurrently alongside one another. Tasks posted to different synchronized dispatchers may potentially run concurrently alongside one another.
Check for synchronized dispatchers
The async
library offers a BasicLockable
type,
async::synchronization_checker
. You may call
.lock()
or lock the checker using a std::lock_guard
whenever a function
requires synchronized access. Doing so checks that the function is called from
a dispatcher with such a guarantee, without actually taking any locks. If the
check fails, the program will panic. It is recommended that thread-unsafe types
check for synchronization at runtime by carrying a checker. Here is a full
example:
// |ChannelReader| lets one asynchronously read from a Zircon channel.
//
// ## Thread safety
//
// Instances must be used from a synchronized async dispatcher. See
// https://fuchsia.dev/fuchsia-src/development/languages/c-cpp/thread-safe-async#synchronized-dispatcher
class ChannelReader {
public:
ChannelReader(async_dispatcher_t* dispatcher, zx::channel channel)
: dispatcher_(dispatcher),
checker_(dispatcher),
channel_(std::move(channel)),
wait_(channel_.get(), ZX_CHANNEL_READABLE) {}
~ChannelReader() {
// |lock| explicitly checks that the dispatcher is not calling callbacks
// that use this |ChannelReader| instance in the meantime.
checker_.lock();
}
// Asynchronously wait for the channel to become readable, then read the
// data into a member variable.
void AsyncRead() {
// This guard checks that the |AsyncRead| method is called from a task
// running on a synchronized dispatcher.
std::lock_guard guard(checker_);
data_.clear();
zx_status_t status = wait_.Begin(
// The dispatcher that will perform the waiting.
dispatcher_,
// The async dispatcher will call this callback when the channel is
// ready to be read from. Because this callback captures `this`, we
// must ensure the callback does not race with destroying the
// |ChannelReader| instance. This is accomplished by calling
// `checker_.lock()` in the |ChannelReader| destructor.
[this](async_dispatcher_t* dispatcher, async::WaitBase* wait, zx_status_t status,
const zx_packet_signal_t* signal) {
if (status != ZX_OK) {
return;
}
std::lock_guard guard(checker_);
uint32_t actual;
data_.resize(ZX_CHANNEL_MAX_MSG_BYTES);
status = channel_.read(0, data_.data(), nullptr,
static_cast<uint32_t>(data_.capacity()), 0, &actual, nullptr);
if (status != ZX_OK) {
data_.clear();
return;
}
data_.resize(actual);
});
ZX_ASSERT(status == ZX_OK);
}
std::vector<uint8_t> data() const {
// Here we also verify synchronization, because we want to avoid race
// conditions such as the user calling |AsyncRead| which clears the
// data and calling |data| to get the data at the same time.
std::lock_guard guard(checker_);
return data_;
}
private:
async_dispatcher_t* dispatcher_;
async::synchronization_checker checker_;
zx::channel channel_;
std::vector<uint8_t> data_ __TA_GUARDED(checker_);
async::WaitOnce wait_;
};
zx::channel c1, c2;
ASSERT_EQ(ZX_OK, zx::channel::create(0, &c1, &c2));
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
ChannelReader reader{loop.dispatcher(), std::move(c1)};
ASSERT_EQ(reader.data(), std::vector<uint8_t>{});
const std::vector<uint8_t> kData{1, 2, 3};
ASSERT_EQ(ZX_OK, c2.write(0, kData.data(), static_cast<uint32_t>(kData.size()), nullptr, 0));
// Using |reader| must be synchronized with dispatching asynchronous operations.
// Here, they are synchronized because we perform these one after the other
// from a single thread.
reader.AsyncRead();
loop.RunUntilIdle();
ASSERT_EQ(reader.data(), kData);
// The following is disallowed, and would lead to a panic.
// If the dispatcher is running from a different thread, then we cannot
// ensure that |reader| is not used in the meantime.
//
// std::thread([&] { loop.RunUntilIdle(); }).join();
fidl::Client
is another example of types that check for synchronized access at
runtime: destroying a fidl::Client
on a non-dispatcher thread will lead to a
panic. There are other C++ classes in the Fuchsia code base that do the same.
They will usually highlight this with a comment such as the following:
// This class is thread-unsafe. Instances must be used and managed from a
// synchronized dispatcher.
class SomeAsyncType { /* ... */ };
See C++ FIDL threading guide for a concrete discussion of this scenario when using FIDL bindings.
Discard callbacks during destruction
You may have noticed that for the ChannelReader
example above to work, the
callback passed to wait_.Begin(...)
must be silently discarded, instead of
called with some error, if ChannelReader
is destroyed. Indeed the
documentation on async::WaitOnce
mentions that it "automatically
cancels the wait when it goes out of scope".
During destruction, some C++ objects would discard the registered callbacks if
those have yet to be called. These kind of APIs are said to guarantee at most
once delivery. async::Wait
and async::Task
are examples of such objects.
This style works well when the callback references a single receiver that owns
the wait/task, i.e. the callback is an upcall. These APIs are typically also
thread-unsafe and requires the aforementioned synchronized access.
By contrast, async::PostTask
may call the provided callback even if an
object captured by the callback is destroyed. In fact async::PostTask
will
always call the provided callback unless the async dispatcher is shut down.
You may use async_patterns::TaskScope
if you need to post
tasks that can be canceled: destroying an
async_patterns::TaskScope
will discard unexecuted callbacks
scheduled on that scope.
Other objects will always call the registered callback exactly once, even during destruction. Those calls would typically provide an error or status indicating cancellation. They are said to guarantee exactly once delivery.
One should consult the corresponding documentation when using an asynchronous API to understand the cancellation semantics.
If you need to work with an exactly once callback API while the object
captured by the callback may go out of scope, consider using a
async_patterns::Receiver
in your object. The Receiver
allows
thread-unsafe asynchronous objects to silently cancel callbacks directed
towards it when going out of scope.
Use an object belonging to a different synchronized dispatcher
To maintain synchronized access, one may manage and use a group of objects on
the same synchronized dispatcher. Those objects can synchronously call into one
another without breaking the synchronization checks. A special case of this is
an application that runs everything on a single async::Loop
with a single
thread, typically called the "main thread".
More complex applications may have multiple synchronized dispatchers. When individual objects must be used from their corresponding synchronized dispatcher, a question arises: how does one object call another object if they are associated with different dispatchers?
A time-tested approach is to have the objects send messages between one another,
as opposed to synchronously calling their instance methods. Concretely, this
could mean that if object A
needs to do something to object B
, A
would
post an asynchronous task to B
's dispatcher. The task (usually a lambda
function) may then synchronously use B
because it is already running under
B
's dispatcher and will be synchronized with other tasks that use B
.
When tasks are posted to a different dispatcher, it's harder to safely discard them when the receiver object goes out of scope. Here are some approaches:
- One may use
async_patterns::DispatcherBound
to both own and make calls to a child object that lives on a different synchronized dispatcher. - One may use
async_patterns::Receiver
to let other objects make calls on their objects, without forcing an ownership relationship. The calls are silently canceled if the receiver is destroyed. - One may reference count the objects, and pass a weak pointer to the posted task. The posted task should do nothing if the pointer is expired.
Golang is a popular example that baked this principle into their language design.
Prior arts
Lightweight mechanisms of ensuring a set of tasks execute one after the other, without necessarily starting operating system threads, is a recurring theme:
- The Chromium project defines a similar sequence concept: Threading and tasks in Chrome.
- The Java Platform added virtual threads.