fpromise::promise<> 用户指南

欢迎!您可能不喜欢使用 C++ 编写描述多步异步操作的代码。

fpromise::promise<> [1] 可让这变得更简单。本指南介绍了异步控制流编程中的常见问题,并在 fpromise::promise<> 库中提供了解决这些问题的常见使用模式。

是什么让异步代码颇具挑战性?

fpromise::promise<> 库中,异步任务是指由多个具有显式挂起点的同步代码块组成的任务。

定义异步任务时,必须有针对以下问题的解决方案:

  1. 表示控制流:同步块的顺序如何以及数据在同步块之间如何流动?如何以易于理解的方式实现这一目标?

  2. 状态和资源管理:需要哪种中间状态来支持任务执行?必须捕获哪些外部资源?如何表达这种表达方式?如何安全地实现这种行为?

术语

  • fpromise::promise<> 是一个只移动对象,由一组描述异步任务的 lambda 或回调组成,最终会生成值或错误。
  • 处理程序函数是创建 promise 时提供的回调。
  • 接续函数是对现有 promise 的各种接续方法提供的回调。
  • fpromise::executor 负责调度和执行 promise。仅当将 Promise 的所有权转让给 fpromise::executor 之后,Promise 才会运行。此时,执行器负责自己的调度和执行。
  • 可以选择将 fpromise::context 传递给处理程序和延续函数,以获取对 fpromise::executor 以及低级别的挂起和恢复控件的访问权限。

构建和执行您的首个 fpromise::promise<>

我们来编写一个简单的 promise。

#include <lib/fpromise/promise.h>

...
fpromise::promise<> p = fpromise::make_promise([] {
  // This is a handler function.

  auto world_is_flat = AssessIfWorldIsFlat();
  if (world_is_flat) {
    return fpromise::error();
  }
  return fpromise::ok();
});

p 现在包含一个用于描述简单任务的 promise。

为了运行 promise,必须将其安排在 fpromise::executor 的实现上。最常用的执行器是 async::Executor [2],它会在 async_dispatcher_t 上调度回调。为便于测试和探索,您还可以使用 fpromise::single_threaded_executor 及其关联的方法 fpromise::run_single_threaded() [3]。

// When a promise is scheduled, the `fpromise::executor` takes ownership of it.
fpromise::result<> result = fpromise::run_single_threaded(std::move(p));
assert(result.is_ok());

构建更复杂的 fpromise::promise<>

返回、错误类型和解决状态

如上所述,fpromise::promise<> 的模板参数表示返回和错误类型:

fpromise::promise<ValueType, ErrorType>

错误类型可以省略,并采用默认错误类型 void(例如 fpromise::promise<MyValueType> 等同于 fpromise::promise<MyValueType, void>)。

在执行期间,promise 最终必须达到以下状态之一:

  • 成功:处理程序函数或最后一个接续函数(见下文)已返回 fpromise::ok()
  • 错误:处理程序函数或某个接续函数已返回 fpromise::error()并且没有后续接续函数拦截了它。
  • 已放弃:promise 在解析为成功或错误之前已销毁。

.then().and_then().or_else():链接异步块

通常,复杂的任务可以分解为更小、更精细的任务。这些任务中的每项任务都需要异步执行,但如果任务之间存在某种依赖关系,则需要保留它们。这可以通过不同的组合器来实现,例如:

  • 无论任务 1 的状态如何,fpromise::promise::then() 都可用于定义任务依赖项,例如,应先执行任务 1,然后再执行任务 2。上一个任务的结果通过 fpromise::result<ValueType, ErrorType>&const fpromise::result<ValueType, ErrorType>& 类型的参数接收。
auto execute_task_1_then_task_2 =
    fpromise::make_promise([]() -> fpromise::result<ValueType, ErrorType> {
      ...
    }).then([](fpromise::result<ValueType, ErrorType>& result) {
      if (result.is_ok()) {
        ...
      } else {  // result.is_error()
        ...
      }
    });
  • 只有在任务 1 成功的情况下,fpromise::promise::and_then() 才可用于定义任务依赖项。上一个任务的结果通过 ValueType&ValueType& 类型的参数接收。
auto execute_task_1_then_task_2 =
    fpromise::make_promise([]() { ... }).and_then([](ValueType& success_value) {
      ...
    });
  • 只有在任务 1 失败的情况下,fpromise::promise::or_else() 才可用于定义任务依赖项。上一个任务的结果通过 ErrorType&const ErrorType& 类型的参数接收。
auto execute_task_1_then_task_2 =
    fpromise::make_promise([]() { ... }).or_else([](ErrorType& failure_value) {
      ...
    });

fpromise::join_promises()fpromise::join_promise_vector():并行执行

有时,可以执行多个 promise,并且它们之间不存在依赖关系,但聚合结果取决于下一个异步步骤。在本例中,fpromise::join_promises()fpromise::join_promise_vector() 用于联接多个 promise 的结果。

如果变量引用了每个 promise,则使用 fpromise::join_promises()fpromise::join_promises() 支持异构 promise 类型。先前任务的结果通过 std::tuple<...>&const std::tuple<...>& 类型的参数接收。

auto DoImportantThingsInParallel() {
  auto promise1 = FetchStringFromDbAsync("foo");
  auto promise2 = InitializeFrobinatorAsync();
  return fpromise::join_promises(std::move(promise1), std::move(promise2))
      .and_then([](std::tuple<fpromise::result<std::string>,
                              fpromise::result<Frobinator>>& results) {
        return fpromise::ok(std::get<0>(results).value() +
                       std::get<1>(results).value().GetFrobinatorSummary());
      });
}

如果 promise 存储在 std::vector<> 中,则使用 fpromise::join_promise_vector()。这会增加一个限制条件,即所有 promise 都必须是同构的(属于同一类型)。先前任务的结果通过 std::vector<fpromise::result<ValueType, ErrorType>>&const std::vector<fpromise::result<ValueType, ErrorType>>& 类型的参数接收。

auto ConcatenateImportantThingsDoneInParallel() {
  std::vector<fpromise::promise<std::string>> promises;
  promises.push_back(FetchStringFromDbAsync("foo"));
  promises.push_back(FetchStringFromDbAsync("bar"));
  return fpromise::join_promise_vector(std::move(promises))
      .and_then([](std::vector<fpromise::result<std::string>>& results) {
        return fpromise::ok(results[0].value() + "," + results[1].value());
      });
}

return fpromise::make_promise():通过返回新 promise 来链接或分支

将有关将哪个 promise 串联起来的决定推迟到运行时可能会很有用。此方法与语法链(通过使用连续的 .then().and_then().or_else() 调用)执行链式操作相反。

处理程序函数可能会返回一个新的 promise,该 promise 将在处理程序函数返回后进行求值,而不是返回 fpromise::result<...>(使用 fpromise::okfpromise::error)。

fpromise::make_promise(...)
  .then([] (fpromise::result<>& result) {
    if (result.is_ok()) {
      return fpromise::make_promise(...); // Do work in success case.
    } else {
      return fpromise::make_promise(...); // Error case.
    }
  });

此模式还有助于将可能长 promise 的内容分解为更小的可读区块,例如让接续函数返回上例中 DoImportantThingsInParallel() 的结果。

声明中间状态并使其保持活跃状态

某些任务要求状态仅在 promise 本身处于待处理或正在执行状态时保持活跃状态。由于需要共享此状态,因此该状态不适合移至任何给定的 lambda;如果希望其生命周期与 promise 耦合,此状态也不适合将所有权转移到生命周期更长的容器。

虽然不是唯一的解决方案,但同时使用 std::unique_ptr<>std::shared_ptr<> 是常见模式:

std::unique_ptr<>

fpromise::promise<> MakePromise() {
  struct State {
    int i;
  };
  // Create a single std::unique_ptr<> container for an instance of State and
  // capture raw pointers to the state in the handler and continuations.
  //
  // Ownership of the underlying memory is transferred to a lambda passed to
  // `.inspect()`. |state| will die when the returned promise is resolved or is
  // abandoned.
  auto state = std::make_unique<State>();
  state->i = 0;
  return fpromise::make_promise([state = state.get()] { state->i++; })
      .and_then([state = state.get()] { state->i--; })
      .inspect([state = std::move(state)](const fpromise::result<>&) {});
}

std::shared_ptr<>

fpromise::promise<> MakePromise() {
  struct State {
    int i;
  };
  // Rely on shared_ptr's reference counting to destroy |state| when it is safe
  // to do so.
  auto state = std::make_shared<State>();
  state->i = 0;
  return fpromise::make_promise([state] { state->i++; }).and_then([state] {
    state->i--;
  });
}

fpromise::scope:放弃 promise,以避免违反内存安全规定

fpromise::scope 可用于将 fpromise::promise<> 的生命周期与内存中的资源关联起来。例如:

#include <lib/fpromise/scope.h>

class A {
 public:
  fpromise::promise<> MakePromise() {
    // Capturing |this| is dangerous: the returned promise will be scheduled
    // and executed in an unknown context. Use |scope_| to protect against
    // possible memory safety violations.
    //
    // The call to `.wrap_with(scope_)` abandons the promise if |scope_| is
    // destroyed. Since |scope_| and |this| share the same lifecycle, it is safe
    // to capture |this|.
    return fpromise::make_promise([this] {
             // |foo_| is critical to the operation!
             return fpromise::ok(foo_.Frobinate());
           })
        .wrap_with(scope_);
  }

 private:
  Frobinator foo_;
  fpromise::scope scope_;
};

void main() {
  auto a = std::make_unique<A>();
  auto promise = a->MakePromise();
  a.reset();
  // |promise| will not run any more, even if scheduled, protected access to the
  // out-of-scope resources.
}

fpromise::sequencer:在单独的 promise 完成时屏蔽 promise

TODO:您可以使用 .wrap_with(sequencer) 阻止此 promise,以便在使用同一个排序器对象封装的最后一个 promise 完成时阻塞

#include <lib/fpromise/sequencer.h>
// TODO

fpromise::bridge:与基于回调的异步函数集成

TODO:fpromise::bridge 可用于将基于回调的异步函数串联延续

#include <lib/fpromise/bridge.h>
// TODO

fpromise::bridge:分离单个接续链的执行

TODO:fpromise::bridge 也可用于将一个接续链分离为两个可在不同 fpromise::executor 实例上执行的 promise。

常见陷阱

and_thenor_else 序列必须具有兼容的类型

使用 and_then 构建 promise 时,每个连续的连续可能具有不同的 ValueType,但必须具有相同的 ErrorType,因为 and_then 会转发先前的错误,而不使用它们。

使用 or_else 构建 promise 时,每个连续的连续可能具有不同的 ErrorType,但必须具有相同的 ValueType,因为 or_else 会转发先前的值,而不使用这些值。

如需更改序列中间的类型,请使用 then 使用之前的结果并生成所需类型的新结果。

以下示例无法编译,因为上一个 and_then 处理程序返回的错误类型与前一个处理程序的结果不兼容。

auto a = fpromise::make_promise([] {
  // returns fpromise::result<int, void>
  return fpromise::ok(4);
}).and_then([] (const int& value) {
  // returns fpromise::result<float, void>
  return fpromise::ok(value * 2.2f);
}).and_then([] (const float& value) {
  // ERROR!  Prior result had "void" error type but this handler returns const
  // char*.
  if (value >= 0)
    return fpromise::ok(value);
  return fpromise::error("bad value");
}

使用 then 来使用结果并更改其类型:

auto a = fpromise::make_promise([] {
  // returns fpromise::result<int, void>
  return fpromise::ok(4);
}).and_then([] (const int& value) {
  // returns fpromise::result<float, void>
  return fpromise::ok(value * 2.2f);
}).then([] (const fpromise::result<float>& result) -> fpromise::result<float, const char*> {
  if (result.is_ok() && result.value() >= 0)
    return fpromise::ok(value);
  return fpromise::error("bad value");
}

处理程序 / 接续函数可以返回 fpromise::result<> 或新的 fpromise::promise<>,但不能同时返回两者

您可能想要编写一个处理程序,以便在一个条件分支中返回 fpromise::promise<>,在另一个条件分支中返回 fpromise::ok()fpromise::error()。这是非法的,因为编译器无法将 fpromise::result<> 转换为 fpromise::promise<>

权宜解决方法是返回可解析为所需结果的 fpromise::promise<>

auto a = fpromise::make_promise([] {
  if (condition) {
    return MakeComplexPromise();
  }
  return fpromise::make_ok_promise(42);
});

接续签名

您是否遇到过这样的错误消息?

../../sdk/lib/fit-promise/include/lib/fpromise/promise_internal.h:342:5: error: static_assert failed "The provided handler's last argument was expected to be of type V& or const V& where V is the prior result's value type and E is the prior result's error type.  Please refer to the combinator's documentation for
 a list of supported handler function signatures."

或:

../../sdk/lib/fit-promise/include/lib/fpromise/promise.h:288:5: error: static_assert failed due to requirement '::fpromise::internal::is_continuation<fpromise::internal::and_then_continuation<fpromise::promise_impl<fit::function_impl<16, false, fpromise::result<fuchsia::modular::storymodel::StoryModel, void> (fpromise::context &)> >, (lambda at ../../src/modular/bin/sessionmgr/story/model/ledger_story_model_storage.cc:222:17)>, void>::value' "Continuation type is invalid.  A continuation is a callable object with this signature: fpromise::result<V, E>(fpromise::context&)."

这很可能意味着其中一个接续函数的签名无效。不同接续函数的有效签名如下所示:

对于 .then()

.then([] (fpromise::result<V, E>& result) {});
.then([] (const fpromise::result<V, E>& result) {});
.then([] (fpromise::context& c, fpromise::result<V, E>& result) {});
.then([] (fpromise::context& c, const fpromise::result<V, E>& result) {});

对于 .and_then()

.and_then([] (V& success_value) {});
.and_then([] (const V& success_value) {});
.and_then([] (fpromise::context& c, V& success_value) {});
.and_then([] (fpromise::context& c, const V& success_value) {});

对于 .or_else()

.or_else([] (E& error_value) {});
.or_else([] (const E& error_value) {});
.or_else([] (fpromise::context& c, E& error_value) {});
.or_else([] (fpromise::context& c, const E& error_value) {});

对于 .inspect()

.inspect([] (fpromise::result<V, E>& result) {});
.inspect([] (const fpromise::result<V, E>& result) {});

捕获和参数生命周期

promise 由处理程序和接续函数(通常是 lambda)组成。在构建 lambda 捕获列表时必须格外小心,以避免捕获在执行相关处理程序或接续时无效的内存。

例如,此 promise 会捕获在 Foo() 返回时(也就是在调度和执行返回的 promise 时)保证无效的内存。

fpromise::promise<> Foo() {
  int i;
  return fpromise::make_promise([&i] {
    i++;  // |i| is only valid within the scope of Foo().
  });
}

实际代码中的实例更细微。一个不太明显的示例:

fpromise::promise<> Foo() {
  return fpromise::make_promise(
      [i = 0] { return fpromise::make_promise([&i] { i++; }); });
}

fpromise::promise 会快速销毁处理程序和接续函数:最外层的处理程序一旦返回最内层的处理程序,便会被销毁。如需了解在这种情况下使用的正确模式,请参阅上文中的“声明中间状态并使其保持活动状态”。

>>> 个章节

  • 从一种错误类型转换为另一种错误类型
  • fpromise::bridge
  • 常见陷阱: 捕获状态的生命周期