欢迎!您可能不喜欢使用 C++ 编写描述多步异步操作的代码。
fpromise::promise<>
[1] 可让这变得更简单。本指南介绍了异步控制流编程中的常见问题,并在 fpromise::promise<>
库中提供了解决这些问题的常见使用模式。
是什么让异步代码颇具挑战性?
在 fpromise::promise<>
库中,异步任务是指由多个具有显式挂起点的同步代码块组成的任务。
定义异步任务时,必须有针对以下问题的解决方案:
表示控制流:同步块的顺序如何以及数据在同步块之间如何流动?如何以易于理解的方式实现这一目标?
状态和资源管理:需要哪种中间状态来支持任务执行?必须捕获哪些外部资源?如何表达这种表达方式?如何安全地实现这种行为?
术语
fpromise::promise<>
是一个只移动对象,由一组描述异步任务的 lambda 或回调组成,最终会生成值或错误。- 处理程序函数是创建 promise 时提供的回调。
- 接续函数是对现有 promise 的各种接续方法提供的回调。
fpromise::executor
负责调度和执行 promise。仅当将 Promise 的所有权转让给fpromise::executor
之后,Promise 才会运行。此时,执行器负责自己的调度和执行。- 可以选择将
fpromise::context
传递给处理程序和延续函数,以获取对fpromise::executor
以及低级别的挂起和恢复控件的访问权限。
构建和执行您的首个 fpromise::promise<>
我们来编写一个简单的 promise。
#include <lib/fpromise/promise.h>
...
fpromise::promise<> p = fpromise::make_promise([] {
// This is a handler function.
auto world_is_flat = AssessIfWorldIsFlat();
if (world_is_flat) {
return fpromise::error();
}
return fpromise::ok();
});
p
现在包含一个用于描述简单任务的 promise。
为了运行 promise,必须将其安排在 fpromise::executor
的实现上。最常用的执行器是 async::Executor
[2],它会在 async_dispatcher_t
上调度回调。为便于测试和探索,您还可以使用 fpromise::single_threaded_executor
及其关联的方法 fpromise::run_single_threaded()
[3]。
// When a promise is scheduled, the `fpromise::executor` takes ownership of it.
fpromise::result<> result = fpromise::run_single_threaded(std::move(p));
assert(result.is_ok());
构建更复杂的 fpromise::promise<>
返回、错误类型和解决状态
如上所述,fpromise::promise<>
的模板参数表示返回和错误类型:
fpromise::promise<ValueType, ErrorType>
错误类型可以省略,并采用默认错误类型 void
(例如 fpromise::promise<MyValueType>
等同于 fpromise::promise<MyValueType,
void>
)。
在执行期间,promise 最终必须达到以下状态之一:
- 成功:处理程序函数或最后一个接续函数(见下文)已返回
fpromise::ok()
。 - 错误:处理程序函数或某个接续函数已返回
fpromise::error()
,并且没有后续接续函数拦截了它。 - 已放弃:promise 在解析为成功或错误之前已销毁。
.then()
、.and_then()
、.or_else()
:链接异步块
通常,复杂的任务可以分解为更小、更精细的任务。这些任务中的每项任务都需要异步执行,但如果任务之间存在某种依赖关系,则需要保留它们。这可以通过不同的组合器来实现,例如:
- 无论任务 1 的状态如何,
fpromise::promise::then()
都可用于定义任务依赖项,例如,应先执行任务 1,然后再执行任务 2。上一个任务的结果通过fpromise::result<ValueType, ErrorType>&
或const fpromise::result<ValueType, ErrorType>&
类型的参数接收。
auto execute_task_1_then_task_2 =
fpromise::make_promise([]() -> fpromise::result<ValueType, ErrorType> {
...
}).then([](fpromise::result<ValueType, ErrorType>& result) {
if (result.is_ok()) {
...
} else { // result.is_error()
...
}
});
- 只有在任务 1 成功的情况下,
fpromise::promise::and_then()
才可用于定义任务依赖项。上一个任务的结果通过ValueType&
或ValueType&
类型的参数接收。
auto execute_task_1_then_task_2 =
fpromise::make_promise([]() { ... }).and_then([](ValueType& success_value) {
...
});
- 只有在任务 1 失败的情况下,
fpromise::promise::or_else()
才可用于定义任务依赖项。上一个任务的结果通过ErrorType&
或const ErrorType&
类型的参数接收。
auto execute_task_1_then_task_2 =
fpromise::make_promise([]() { ... }).or_else([](ErrorType& failure_value) {
...
});
fpromise::join_promises()
和 fpromise::join_promise_vector()
:并行执行
有时,可以执行多个 promise,并且它们之间不存在依赖关系,但聚合结果取决于下一个异步步骤。在本例中,fpromise::join_promises()
和 fpromise::join_promise_vector()
用于联接多个 promise 的结果。
如果变量引用了每个 promise,则使用 fpromise::join_promises()
。fpromise::join_promises()
支持异构 promise 类型。先前任务的结果通过 std::tuple<...>&
或 const
std::tuple<...>&
类型的参数接收。
auto DoImportantThingsInParallel() {
auto promise1 = FetchStringFromDbAsync("foo");
auto promise2 = InitializeFrobinatorAsync();
return fpromise::join_promises(std::move(promise1), std::move(promise2))
.and_then([](std::tuple<fpromise::result<std::string>,
fpromise::result<Frobinator>>& results) {
return fpromise::ok(std::get<0>(results).value() +
std::get<1>(results).value().GetFrobinatorSummary());
});
}
如果 promise 存储在 std::vector<>
中,则使用 fpromise::join_promise_vector()
。这会增加一个限制条件,即所有 promise 都必须是同构的(属于同一类型)。先前任务的结果通过 std::vector<fpromise::result<ValueType, ErrorType>>&
或 const std::vector<fpromise::result<ValueType, ErrorType>>&
类型的参数接收。
auto ConcatenateImportantThingsDoneInParallel() {
std::vector<fpromise::promise<std::string>> promises;
promises.push_back(FetchStringFromDbAsync("foo"));
promises.push_back(FetchStringFromDbAsync("bar"));
return fpromise::join_promise_vector(std::move(promises))
.and_then([](std::vector<fpromise::result<std::string>>& results) {
return fpromise::ok(results[0].value() + "," + results[1].value());
});
}
return fpromise::make_promise()
:通过返回新 promise 来链接或分支
将有关将哪个 promise 串联起来的决定推迟到运行时可能会很有用。此方法与语法链(通过使用连续的 .then()
、.and_then()
和 .or_else()
调用)执行链式操作相反。
处理程序函数可能会返回一个新的 promise,该 promise 将在处理程序函数返回后进行求值,而不是返回 fpromise::result<...>
(使用 fpromise::ok
或 fpromise::error
)。
fpromise::make_promise(...)
.then([] (fpromise::result<>& result) {
if (result.is_ok()) {
return fpromise::make_promise(...); // Do work in success case.
} else {
return fpromise::make_promise(...); // Error case.
}
});
此模式还有助于将可能长 promise 的内容分解为更小的可读区块,例如让接续函数返回上例中 DoImportantThingsInParallel()
的结果。
声明中间状态并使其保持活跃状态
某些任务要求状态仅在 promise 本身处于待处理或正在执行状态时保持活跃状态。由于需要共享此状态,因此该状态不适合移至任何给定的 lambda;如果希望其生命周期与 promise 耦合,此状态也不适合将所有权转移到生命周期更长的容器。
虽然不是唯一的解决方案,但同时使用 std::unique_ptr<>
和 std::shared_ptr<>
是常见模式:
std::unique_ptr<>
fpromise::promise<> MakePromise() {
struct State {
int i;
};
// Create a single std::unique_ptr<> container for an instance of State and
// capture raw pointers to the state in the handler and continuations.
//
// Ownership of the underlying memory is transferred to a lambda passed to
// `.inspect()`. |state| will die when the returned promise is resolved or is
// abandoned.
auto state = std::make_unique<State>();
state->i = 0;
return fpromise::make_promise([state = state.get()] { state->i++; })
.and_then([state = state.get()] { state->i--; })
.inspect([state = std::move(state)](const fpromise::result<>&) {});
}
std::shared_ptr<>
fpromise::promise<> MakePromise() {
struct State {
int i;
};
// Rely on shared_ptr's reference counting to destroy |state| when it is safe
// to do so.
auto state = std::make_shared<State>();
state->i = 0;
return fpromise::make_promise([state] { state->i++; }).and_then([state] {
state->i--;
});
}
fpromise::scope
:放弃 promise,以避免违反内存安全规定
fpromise::scope
可用于将 fpromise::promise<>
的生命周期与内存中的资源关联起来。例如:
#include <lib/fpromise/scope.h>
class A {
public:
fpromise::promise<> MakePromise() {
// Capturing |this| is dangerous: the returned promise will be scheduled
// and executed in an unknown context. Use |scope_| to protect against
// possible memory safety violations.
//
// The call to `.wrap_with(scope_)` abandons the promise if |scope_| is
// destroyed. Since |scope_| and |this| share the same lifecycle, it is safe
// to capture |this|.
return fpromise::make_promise([this] {
// |foo_| is critical to the operation!
return fpromise::ok(foo_.Frobinate());
})
.wrap_with(scope_);
}
private:
Frobinator foo_;
fpromise::scope scope_;
};
void main() {
auto a = std::make_unique<A>();
auto promise = a->MakePromise();
a.reset();
// |promise| will not run any more, even if scheduled, protected access to the
// out-of-scope resources.
}
fpromise::sequencer
:在单独的 promise 完成时屏蔽 promise
TODO:您可以使用 .wrap_with(sequencer) 阻止此 promise,以便在使用同一个排序器对象封装的最后一个 promise 完成时阻塞
#include <lib/fpromise/sequencer.h>
// TODO
fpromise::bridge
:与基于回调的异步函数集成
TODO:fpromise::bridge 可用于将基于回调的异步函数串联延续
#include <lib/fpromise/bridge.h>
// TODO
fpromise::bridge
:分离单个接续链的执行
TODO:fpromise::bridge 也可用于将一个接续链分离为两个可在不同 fpromise::executor
实例上执行的 promise。
常见陷阱
and_then
或 or_else
序列必须具有兼容的类型
使用 and_then
构建 promise 时,每个连续的连续可能具有不同的 ValueType,但必须具有相同的 ErrorType,因为 and_then
会转发先前的错误,而不使用它们。
使用 or_else
构建 promise 时,每个连续的连续可能具有不同的 ErrorType,但必须具有相同的 ValueType,因为 or_else
会转发先前的值,而不使用这些值。
如需更改序列中间的类型,请使用 then
使用之前的结果并生成所需类型的新结果。
以下示例无法编译,因为上一个 and_then
处理程序返回的错误类型与前一个处理程序的结果不兼容。
auto a = fpromise::make_promise([] {
// returns fpromise::result<int, void>
return fpromise::ok(4);
}).and_then([] (const int& value) {
// returns fpromise::result<float, void>
return fpromise::ok(value * 2.2f);
}).and_then([] (const float& value) {
// ERROR! Prior result had "void" error type but this handler returns const
// char*.
if (value >= 0)
return fpromise::ok(value);
return fpromise::error("bad value");
}
使用 then
来使用结果并更改其类型:
auto a = fpromise::make_promise([] {
// returns fpromise::result<int, void>
return fpromise::ok(4);
}).and_then([] (const int& value) {
// returns fpromise::result<float, void>
return fpromise::ok(value * 2.2f);
}).then([] (const fpromise::result<float>& result) -> fpromise::result<float, const char*> {
if (result.is_ok() && result.value() >= 0)
return fpromise::ok(value);
return fpromise::error("bad value");
}
处理程序 / 接续函数可以返回 fpromise::result<>
或新的 fpromise::promise<>
,但不能同时返回两者
您可能想要编写一个处理程序,以便在一个条件分支中返回 fpromise::promise<>
,在另一个条件分支中返回 fpromise::ok()
或 fpromise::error()
。这是非法的,因为编译器无法将 fpromise::result<>
转换为 fpromise::promise<>
。
权宜解决方法是返回可解析为所需结果的 fpromise::promise<>
:
auto a = fpromise::make_promise([] {
if (condition) {
return MakeComplexPromise();
}
return fpromise::make_ok_promise(42);
});
接续签名
您是否遇到过这样的错误消息?
../../sdk/lib/fit-promise/include/lib/fpromise/promise_internal.h:342:5: error: static_assert failed "The provided handler's last argument was expected to be of type V& or const V& where V is the prior result's value type and E is the prior result's error type. Please refer to the combinator's documentation for
a list of supported handler function signatures."
或:
../../sdk/lib/fit-promise/include/lib/fpromise/promise.h:288:5: error: static_assert failed due to requirement '::fpromise::internal::is_continuation<fpromise::internal::and_then_continuation<fpromise::promise_impl<fit::function_impl<16, false, fpromise::result<fuchsia::modular::storymodel::StoryModel, void> (fpromise::context &)> >, (lambda at ../../src/modular/bin/sessionmgr/story/model/ledger_story_model_storage.cc:222:17)>, void>::value' "Continuation type is invalid. A continuation is a callable object with this signature: fpromise::result<V, E>(fpromise::context&)."
这很可能意味着其中一个接续函数的签名无效。不同接续函数的有效签名如下所示:
对于 .then()
:
.then([] (fpromise::result<V, E>& result) {});
.then([] (const fpromise::result<V, E>& result) {});
.then([] (fpromise::context& c, fpromise::result<V, E>& result) {});
.then([] (fpromise::context& c, const fpromise::result<V, E>& result) {});
对于 .and_then()
:
.and_then([] (V& success_value) {});
.and_then([] (const V& success_value) {});
.and_then([] (fpromise::context& c, V& success_value) {});
.and_then([] (fpromise::context& c, const V& success_value) {});
对于 .or_else()
:
.or_else([] (E& error_value) {});
.or_else([] (const E& error_value) {});
.or_else([] (fpromise::context& c, E& error_value) {});
.or_else([] (fpromise::context& c, const E& error_value) {});
对于 .inspect()
:
.inspect([] (fpromise::result<V, E>& result) {});
.inspect([] (const fpromise::result<V, E>& result) {});
捕获和参数生命周期
promise 由处理程序和接续函数(通常是 lambda)组成。在构建 lambda 捕获列表时必须格外小心,以避免捕获在执行相关处理程序或接续时无效的内存。
例如,此 promise 会捕获在 Foo() 返回时(也就是在调度和执行返回的 promise 时)保证无效的内存。
fpromise::promise<> Foo() {
int i;
return fpromise::make_promise([&i] {
i++; // |i| is only valid within the scope of Foo().
});
}
实际代码中的实例更细微。一个不太明显的示例:
fpromise::promise<> Foo() {
return fpromise::make_promise(
[i = 0] { return fpromise::make_promise([&i] { i++; }); });
}
fpromise::promise
会快速销毁处理程序和接续函数:最外层的处理程序一旦返回最内层的处理程序,便会被销毁。如需了解在这种情况下使用的正确模式,请参阅上文中的“声明中间状态并使其保持活动状态”。
>>> 个章节
- 从一种错误类型转换为另一种错误类型
- fpromise::bridge
- 常见陷阱: 捕获状态的生命周期