在 Rust 中请求流水线

前提条件

在本教程中,您将了解请求流水线模式及其优势。本教程假定您已熟悉编写和运行 FIDL 客户端和服务器的基础知识,这些知识在 Rust 快速入门教程中有所介绍。

概览

在 Fuchsia 上使用 FIDL 的一个常见方面是在协议之间传递协议本身。许多 FIDL 消息都包含通道的客户端端或服务器端,其中通道用于通过不同的 FIDL 协议进行通信。在这种情况下,客户端端表示通道的远程端实现了指定协议,而服务器端表示远程端正在请求指定协议。客户端结束和服务器结束的另一组术语是协议和协议请求。

本教程涵盖以下内容:

  • 这些客户端和服务器的用法(无论是在 FIDL 中还是在 Rust FIDL 绑定中)都已结束。
  • 请求流水线模式及其优势。

本教程的完整示例代码位于 //examples/fidl/rust/request_pipelining

FIDL 协议

本教程实现了 fuchsia.examples 库中的 EchoLauncher 协议:

@discoverable
closed protocol EchoLauncher {
    strict GetEcho(struct {
        echo_prefix string:MAX_STRING_LENGTH;
    }) -> (resource struct {
        response client_end:Echo;
    });
    strict GetEchoPipelined(resource struct {
        echo_prefix string:MAX_STRING_LENGTH;
        request server_end:Echo;
    });
};

此协议可让客户端检索 Echo 协议的实例。客户端可以指定一个前缀,生成的 Echo 实例会将该前缀添加到每个响应中。

您可以使用以下两种方法来完成此操作:

  • GetEcho:将前缀作为请求,并使用连接到 Echo 协议实现的渠道的客户端端进行响应。在收到响应中的客户端端点后,客户端可以使用该客户端端点开始通过 Echo 协议发出请求。
  • GetEchoPipelined:将渠道的前缀和服务器端作为请求,并将 Echo 的实现绑定到该请求。假设发出请求的客户端已持有客户端端,并且会在调用 GetEchoPipeliend 后开始在该渠道上发出 Echo 请求。

顾名思义,后者使用一种称为协议请求流水线的模式,是首选方法。本教程将实现这两种方法。

实现服务器

实现 Echo 协议

Echo 实现允许指定前缀,以便区分 Echo 服务器的不同实例:

// An Echo implementation that adds a prefix to every response
async fn run_echo_server(stream: EchoRequestStream, prefix: &str) -> Result<(), Error> {
    stream
        .map(|result| result.context("failed request"))
        .try_for_each(|request| async move {
            match request {
                // The SendString request is not used in this example, so just
                // ignore it
                EchoRequest::SendString { value: _, control_handle: _ } => {}
                EchoRequest::EchoString { value, responder } => {
                    println!("Got echo request for prefix {}", prefix);
                    let response = format!("{}: {}", prefix, value);
                    responder.send(&response).context("error sending response")?;
                }
            }
            Ok(())
        })
        .await
}

由于客户端仅使用 EchoString,因此 SendString 处理程序为空。

实现 EchoLauncher 协议

总体结构与 Echo 实现类似,但不同之处在于,使用的是 try_for_each_concurrent 而不是 try_for_each。此示例中的客户端启动了两个 Echo 实例,因此,使用并发版本可让对 run_echo_server 的两次调用并发运行:

// The EchoLauncher implementation that launches Echo servers with the specified
// prefix
async fn run_echo_launcher_server(stream: EchoLauncherRequestStream) -> Result<(), Error> {
    // Currently the client only connects at most two Echo clients for each EchoLauncher
    stream
        .map(|result| result.context("request error"))
        .try_for_each_concurrent(2, |request| async move {
            let (echo_prefix, server_end) = match request {
                // In the non pipelined case, we need to initialize the
                // communication channel ourselves
                EchoLauncherRequest::GetEcho { echo_prefix, responder } => {
                    println!("Got non pipelined request");
                    let (client_end, server_end) = create_endpoints::<EchoMarker>();
                    responder.send(client_end)?;
                    (echo_prefix, server_end)
                }
                // In the pipelined case, the client is responsible for
                // initializing the channel, and passes the server its end of
                // the channel
                EchoLauncherRequest::GetEchoPipelined {
                    echo_prefix,
                    request,
                    control_handle: _,
                } => {
                    println!("Got pipelined request");
                    (echo_prefix, request)
                }
            };
            // Run the Echo server with the specified prefix
            run_echo_server(server_end.into_stream(), &echo_prefix).await
        })
        .await
}

这两个 EchoLauncher 方法都是通过在渠道的服务器端调用 run_echo_server 来处理的。不同之处在于,在 GetEcho 中,服务器负责初始化通道 - 它使用一端作为服务器端,并将另一端发送回客户端。在 GetEchoPipelined 中,服务器端作为请求的一部分提供,因此服务器无需执行额外的工作,也不需要响应。

// The EchoLauncher implementation that launches Echo servers with the specified
// prefix
async fn run_echo_launcher_server(stream: EchoLauncherRequestStream) -> Result<(), Error> {
    // Currently the client only connects at most two Echo clients for each EchoLauncher
    stream
        .map(|result| result.context("request error"))
        .try_for_each_concurrent(2, |request| async move {
            let (echo_prefix, server_end) = match request {
                // In the non pipelined case, we need to initialize the
                // communication channel ourselves
                EchoLauncherRequest::GetEcho { echo_prefix, responder } => {
                    println!("Got non pipelined request");
                    let (client_end, server_end) = create_endpoints::<EchoMarker>();
                    responder.send(client_end)?;
                    (echo_prefix, server_end)
                }
                // In the pipelined case, the client is responsible for
                // initializing the channel, and passes the server its end of
                // the channel
                EchoLauncherRequest::GetEchoPipelined {
                    echo_prefix,
                    request,
                    control_handle: _,
                } => {
                    println!("Got pipelined request");
                    (echo_prefix, request)
                }
            };
            // Run the Echo server with the specified prefix
            run_echo_server(server_end.into_stream(), &echo_prefix).await
        })
        .await
}

提供 EchoLauncher 协议

主循环应与服务器教程中的主循环相同,但提供的是 EchoLauncher 而不是 Echo

enum IncomingService {
    EchoLauncher(EchoLauncherRequestStream),
}

#[fuchsia::main]
async fn main() -> Result<(), Error> {
    let mut fs = ServiceFs::new_local();
    fs.dir("svc").add_fidl_service(IncomingService::EchoLauncher);
    fs.take_and_serve_directory_handle()?;

    const MAX_CONCURRENT: usize = 1000;
    let fut = fs.for_each_concurrent(MAX_CONCURRENT, |IncomingService::EchoLauncher(stream)| {
        run_echo_launcher_server(stream).unwrap_or_else(|e| println!("{:?}", e))
    });

    println!("Running echo launcher server");
    fut.await;
    Ok(())
}

构建服务器

(可选)如需检查是否一切正常,请尝试构建服务器:

  1. 配置 GN build 以包含服务器:

    fx set core.x64 --with //examples/fidl/rust/request_pipelining/server:echo-server
  2. 构建 Fuchsia 映像:

    fx build

实现客户端

连接到 EchoLauncher 服务器后,客户端代码使用 GetEcho 连接到 Echo 的一个实例,使用 GetEchoPipelined 连接到另一个实例,然后对每个实例发出 EchoString 请求。

以下是非流水线代码:

#[fuchsia::main]
async fn main() -> Result<(), Error> {
    let echo_launcher =
        connect_to_protocol::<EchoLauncherMarker>().context("Failed to connect to echo service")?;

    // Create a future that obtains an Echo protocol using the non-pipelined
    // GetEcho method
    let non_pipelined_fut = async {
        let client_end = echo_launcher.get_echo("not pipelined").await?;
        // "Upgrade" the client end in the response into an Echo proxy, and
        // make an EchoString request on it
        let proxy = client_end.into_proxy();
        proxy.echo_string("hello").map_ok(|val| println!("Got echo response {}", val)).await
    };

    // Create a future that obtains an Echo protocol using the pipelined GetEcho
    // method
    let (proxy, server_end) = create_proxy::<EchoMarker>();
    echo_launcher.get_echo_pipelined("pipelined", server_end)?;
    // We can make a request to the server right after sending the pipelined request
    let pipelined_fut =
        proxy.echo_string("hello").map_ok(|val| println!("Got echo response {}", val));

    // Run the two futures to completion
    let (non_pipelined_result, pipelined_result) = join!(non_pipelined_fut, pipelined_fut);
    pipelined_result?;
    non_pipelined_result?;
    Ok(())
}

此代码将两个 future 链接在一起。首先,它会向客户端发出 GetEcho 请求。然后,它会获取该 future 的结果,并使用该结果创建一个客户端对象 (proxy),调用 EchoString,然后使用 await 阻塞结果。

尽管必须先初始化通道,但流水线代码要简单得多:

#[fuchsia::main]
async fn main() -> Result<(), Error> {
    let echo_launcher =
        connect_to_protocol::<EchoLauncherMarker>().context("Failed to connect to echo service")?;

    // Create a future that obtains an Echo protocol using the non-pipelined
    // GetEcho method
    let non_pipelined_fut = async {
        let client_end = echo_launcher.get_echo("not pipelined").await?;
        // "Upgrade" the client end in the response into an Echo proxy, and
        // make an EchoString request on it
        let proxy = client_end.into_proxy();
        proxy.echo_string("hello").map_ok(|val| println!("Got echo response {}", val)).await
    };

    // Create a future that obtains an Echo protocol using the pipelined GetEcho
    // method
    let (proxy, server_end) = create_proxy::<EchoMarker>();
    echo_launcher.get_echo_pipelined("pipelined", server_end)?;
    // We can make a request to the server right after sending the pipelined request
    let pipelined_fut =
        proxy.echo_string("hello").map_ok(|val| println!("Got echo response {}", val));

    // Run the two futures to completion
    let (non_pipelined_result, pipelined_result) = join!(non_pipelined_fut, pipelined_fut);
    pipelined_result?;
    non_pipelined_result?;
    Ok(())
}

使用 create_proxy,这是用于创建通道两端并将一端转换为代理的快捷方式。在调用 GetEchoPipelined 后,客户端可以立即发出 EchoString 请求。

最后,同时运行与非流水线式调用和流水线式调用对应的两个 future,以查看哪个先完成:

#[fuchsia::main]
async fn main() -> Result<(), Error> {
    let echo_launcher =
        connect_to_protocol::<EchoLauncherMarker>().context("Failed to connect to echo service")?;

    // Create a future that obtains an Echo protocol using the non-pipelined
    // GetEcho method
    let non_pipelined_fut = async {
        let client_end = echo_launcher.get_echo("not pipelined").await?;
        // "Upgrade" the client end in the response into an Echo proxy, and
        // make an EchoString request on it
        let proxy = client_end.into_proxy();
        proxy.echo_string("hello").map_ok(|val| println!("Got echo response {}", val)).await
    };

    // Create a future that obtains an Echo protocol using the pipelined GetEcho
    // method
    let (proxy, server_end) = create_proxy::<EchoMarker>();
    echo_launcher.get_echo_pipelined("pipelined", server_end)?;
    // We can make a request to the server right after sending the pipelined request
    let pipelined_fut =
        proxy.echo_string("hello").map_ok(|val| println!("Got echo response {}", val));

    // Run the two futures to completion
    let (non_pipelined_result, pipelined_result) = join!(non_pipelined_fut, pipelined_fut);
    pipelined_result?;
    non_pipelined_result?;
    Ok(())
}

构建客户端

(可选)如需检查是否正确,请尝试构建客户端:

  1. 配置 GN build 以包含服务器:

    fx set core.x64 --with //examples/fidl/rust/request_pipelining/client:echo-client
  2. 构建 Fuchsia 映像:

    fx build

运行示例代码

在本教程中,提供了一个 组件来声明 fuchsia.examples.Echofuchsia.examples.EchoLauncher 的相应功能和路由。

中探索 realm 组件的完整源代码
  1. 配置 build 以包含提供的软件包,该软件包包含回声 realm、服务器和客户端:

    fx set core.x64 --with //examples/fidl/rust:echo-launcher-rust
  2. 构建 Fuchsia 映像:

    fx build
  3. 运行 echo_realm 组件。这会创建客户端和服务器组件实例,并路由功能:

    ffx component run /core/ffx-laboratory:echo_realm fuchsia-pkg://fuchsia.com/echo-launcher-rust#meta/echo_realm.cm
  4. 启动 echo_client 实例:

    ffx component start /core/ffx-laboratory:echo_realm/echo_client

当客户端尝试连接到 EchoLauncher 协议时,服务器组件会启动。您应该会在设备日志 (ffx log) 中看到类似如下所示的输出:

[echo_server][][I] Running echo launcher server
[echo_server][][I] Got pipelined request
[echo_server][][I] Got echo request for prefix pipelined
[echo_server][][I] Got non pipelined request
[echo_client][][I] Got echo response pipelined: hello
[echo_server][][I] Got echo request for prefix not pipelined
[echo_client][][I] Got echo response not pipelined: hello

根据打印顺序,可以看出流水线式处理更快。流水线处理情况下的回显响应会先到达,即使非流水线处理请求先发送也是如此,因为请求流水线处理可节省客户端与服务器之间的一次往返。请求流水线处理还可以简化代码。

如需进一步了解协议请求流水线,包括如何处理可能失败的协议请求,请参阅 FIDL API 评分标准

终止 realm 组件以停止执行并清理组件实例:

ffx component destroy /core/ffx-laboratory:echo_realm