在 Rust 中实现 FIDL 服务器

前提条件

本教程假定您熟悉如何在 GN 中列出库的 FIDL Rust 绑定作为依赖项,以及如何将绑定导入 Rust 代码,Rust FIDL crate 教程中介绍了这些内容。

概览

本教程介绍了如何实现 FIDL 协议 (fuchsia.examples.Echo) 并在 Fuchsia 上运行该协议。此协议每种方法都有一个方法:

  • EchoString 是具有响应的方法。
  • SendString 是没有响应的方法。
  • OnString是一个事件。
@discoverable
closed protocol Echo {
    strict EchoString(struct {
        value string:MAX_STRING_LENGTH;
    }) -> (struct {
        response string:MAX_STRING_LENGTH;
    });
    strict SendString(struct {
        value string:MAX_STRING_LENGTH;
    });
    strict -> OnString(struct {
        response string:MAX_STRING_LENGTH;
    });
};

如需详细了解 FIDL 方法和消息传递模型,请参阅 FIDL 概念页面。

本文档介绍了如何完成以下任务:

  • 实现 FIDL 协议。
  • 在 Fuchsia 上构建并运行软件包。
  • 提供 FIDL 协议。

本教程首先创建一个提供给 Fuchsia 设备并运行的组件。然后,它会逐步添加用于启动并运行服务器的功能。

如果您要自行编写代码,请删除以下目录:

rm -r examples/fidl/rust/server/*

创建组件

若要创建组件,请执行以下操作:

  1. examples/fidl/rust/server/src/main.rs 添加 main() 函数:

    fn main() {
      println!("Hello, world!");
    }
    
  2. examples/fidl/rust/server/BUILD.gn 中声明服务器的目标:

    import("//build/rust/rustc_binary.gni")
    
    
    # Declare an executable for the server. This produces a binary with the
    # specified output name that can run on Fuchsia.
    rustc_binary("bin") {
      output_name = "fidl_echo_rust_server"
      edition = "2021"
    
      sources = [ "src/main.rs" ]
    }
    
    # Declare a component for the server, which consists of the manifest and the
    # binary that the component will run.
    fuchsia_component("echo-server") {
      component_name = "echo_server"
      manifest = "meta/server.cml"
      deps = [ ":bin" ]
    }
    
    # Declare a package that contains a single component, our server.
    fuchsia_package("echo-rust-server") {
      deps = [ ":echo-server" ]
    }
    

    为使服务器组件启动并运行,需要定义三个目标:

    • 用于在 Fuchsia 上运行的服务器的原始可执行文件。
    • 一种组件,设置为简单运行服务器可执行文件,使用组件的清单文件对其进行描述。
    • 然后将该组件放入软件包(即 Fuchsia 上软件分发的单元)中。在本例中,该软件包仅包含一个组件。

    如需详细了解软件包、组件以及如何构建它们,请参阅构建组件页面。

  3. examples/fidl/rust/server/meta/server.cml 中添加组件清单:

    {
        include: [ "syslog/client.shard.cml" ],
    
        // Information about the program to run.
        program: {
            // Use the built-in ELF runner.
            runner: "elf",
    
            // The binary to run for this component.
            binary: "bin/fidl_echo_rust_server",
        },
    
        // Capabilities provided by this component.
        capabilities: [
            { protocol: "fuchsia.examples.Echo" },
        ],
        expose: [
            {
                protocol: "fuchsia.examples.Echo",
                from: "self",
            },
        ],
    }
    
    
  4. 将服务器添加到您的 build 配置中:

    fx set core.x64 --with //examples/fidl/rust/server:echo-rust-server
    
  5. 构建 Fuchsia 映像:

    fx build
    

实现服务器

首先,您将实现 Echo 协议的行为。在 Rust 中,这表示为可以处理协议的关联请求流类型的代码,在本例中为 EchoRequestStream。此类型是 Echo 请求流,即它实现了 futures::Stream<Item = Result<EchoRequest, fidl::Error>>

您将实现 run_echo_server() 来处理请求流,这是一个异步函数,用于处理传入的服务请求。它会返回一个在客户端通道关闭后完成的 Future。

添加依赖项

  1. 导入所需的依赖项:

    // we'll use anyhow to propagate errors that occur when handling the request stream
    use anyhow::{Context as _, Error};
    // the server will need to handle an EchoRequestStream
    use fidl_fuchsia_examples::{EchoRequest, EchoRequestStream};
    // import the futures prelude, which includes things like the Future and Stream traits
    use futures::prelude::*;
    
  2. 将它们作为 build 依赖项添加到 rustc_binary 目标中。deps 字段应如下所示:

    deps = [
      "//examples/fidl/fuchsia.examples:fuchsia.examples_rust",
      "//third_party/rust_crates:anyhow",
      "//third_party/rust_crates:futures",
    ]
    

定义 run_echo_server

// An implementation of the Echo stream, which handles a stream of EchoRequests
async fn run_echo_server(stream: EchoRequestStream) -> Result<(), Error> {
    stream
        .map(|result| result.context("failed request"))
        .try_for_each(|request| async move {
            match request {
                // Handle each EchoString request by responding with the request
                // value
                EchoRequest::EchoString { value, responder } => {
                    println!("Received EchoString request for string {:?}", value);
                    responder.send(&value).context("error sending response")?;
                    println!("Response sent successfully");
                }
                // Handle each SendString request by sending a single OnString
                // event with the request value
                EchoRequest::SendString { value, control_handle } => {
                    println!("Received SendString request for string {:?}", value);
                    control_handle.send_on_string(&value).context("error sending event")?;
                    println!("Event sent successfully");
                }
            }
            Ok(())
        })
        .await
}

该实现包括以下元素:

  • 该代码通过对每个结果使用 .context() 方法附加上下文,将请求流中的 fidl:Error 转换为 anyhow::Error

    // An implementation of the Echo stream, which handles a stream of EchoRequests
    async fn run_echo_server(stream: EchoRequestStream) -> Result<(), Error> {
        stream
            .map(|result| result.context("failed request"))
            .try_for_each(|request| async move {
                match request {
                    // Handle each EchoString request by responding with the request
                    // value
                    EchoRequest::EchoString { value, responder } => {
                        println!("Received EchoString request for string {:?}", value);
                        responder.send(&value).context("error sending response")?;
                        println!("Response sent successfully");
                    }
                    // Handle each SendString request by sending a single OnString
                    // event with the request value
                    EchoRequest::SendString { value, control_handle } => {
                        println!("Received SendString request for string {:?}", value);
                        control_handle.send_on_string(&value).context("error sending event")?;
                        println!("Event sent successfully");
                    }
                }
                Ok(())
            })
            .await
    }
    

    在此阶段,Result<EchoRequest, fidl::Error> 的流会变为 Result<EchoRequest, anyhow::Error> 的流。

  • 然后,该函数对生成的流调用 try_for_each,以返回期值。此方法会解封数据流中的 Result - 任何失败都会导致 Future 立即返回并显示此错误,并且任何成功的内容都会传递给闭包。同样,如果闭包的返回值解析为失败,则生成的 Future 将立即返回并显示该错误:

    // An implementation of the Echo stream, which handles a stream of EchoRequests
    async fn run_echo_server(stream: EchoRequestStream) -> Result<(), Error> {
        stream
            .map(|result| result.context("failed request"))
            .try_for_each(|request| async move {
                match request {
                    // Handle each EchoString request by responding with the request
                    // value
                    EchoRequest::EchoString { value, responder } => {
                        println!("Received EchoString request for string {:?}", value);
                        responder.send(&value).context("error sending response")?;
                        println!("Response sent successfully");
                    }
                    // Handle each SendString request by sending a single OnString
                    // event with the request value
                    EchoRequest::SendString { value, control_handle } => {
                        println!("Received SendString request for string {:?}", value);
                        control_handle.send_on_string(&value).context("error sending event")?;
                        println!("Event sent successfully");
                    }
                }
                Ok(())
            })
            .await
    }
    
  • 闭包的内容通过匹配传入的EchoRequest来确定它们所属的类型,以此来处理传入的内容:

    // An implementation of the Echo stream, which handles a stream of EchoRequests
    async fn run_echo_server(stream: EchoRequestStream) -> Result<(), Error> {
        stream
            .map(|result| result.context("failed request"))
            .try_for_each(|request| async move {
                match request {
                    // Handle each EchoString request by responding with the request
                    // value
                    EchoRequest::EchoString { value, responder } => {
                        println!("Received EchoString request for string {:?}", value);
                        responder.send(&value).context("error sending response")?;
                        println!("Response sent successfully");
                    }
                    // Handle each SendString request by sending a single OnString
                    // event with the request value
                    EchoRequest::SendString { value, control_handle } => {
                        println!("Received SendString request for string {:?}", value);
                        control_handle.send_on_string(&value).context("error sending event")?;
                        println!("Event sent successfully");
                    }
                }
                Ok(())
            })
            .await
    }
    

    此实现通过回显输入来处理 EchoString 请求,通过发送 OnString 事件来处理 SendString 请求。由于 SendString 是一种触发即忘记方法,因此请求枚举变体附带一个控制句柄,该句柄可用于传回服务器。

    在这两种情况下,通过添加上下文和使用 ? 运算符,将消息发送回客户端的错误得以传播。如果成功到达闭包的末尾,则返回 Ok(())

  • 最后,服务器函数会对 try_for_each 返回的 Future 执行 await 操作以完成整个过程,该函数将对每个传入请求调用闭包,并在所有请求都已处理或遇到任何错误时返回结果。

您可以通过运行以下命令来验证实现是否正确:

fx build

提供协议

现在,您已经定义了用于处理传入请求的代码,接下来您需要监听到 Echo 服务器的传入连接。方法是要求组件管理器向其他组件公开 Echo 协议。然后,组件管理器会将对 echo 协议的任何请求路由到我们的服务器。

为了执行这些请求,组件管理器需要协议的名称以及在它有任何传入请求来连接到与指定名称匹配的协议时应调用的处理程序。

添加依赖项

  1. 导入所需的依赖项:

    // Import the Fuchsia async runtime in order to run the async main function
    use fuchsia_async as fasync;
    // ServiceFs is a filesystem used to connect clients to the Echo service
    use fuchsia_component::server::ServiceFs;
    
  2. 将它们作为 build 依赖项添加到 rustc_binary 目标中。完整的目标如下所示:

    rustc_binary("bin") {
      name = "fidl_echo_rust_server"
      edition = "2021"
    
      deps = [
        "//examples/fidl/fuchsia.examples:fuchsia.examples_rust",
        "//src/lib/fuchsia",
        "//src/lib/fuchsia-component",
        "//third_party/rust_crates:anyhow",
        "//third_party/rust_crates:futures",
      ]
    
      sources = [ "src/main.rs" ]
    }
    
    

定义 main 函数

#[fuchsia::main]
async fn main() -> Result<(), Error> {
    // Initialize the outgoing services provided by this component
    let mut fs = ServiceFs::new_local();
    fs.dir("svc").add_fidl_service(IncomingService::Echo);

    // Serve the outgoing services
    fs.take_and_serve_directory_handle()?;

    // Listen for incoming requests to connect to Echo, and call run_echo_server
    // on each one
    println!("Listening for incoming connections...");
    const MAX_CONCURRENT: usize = 10_000;
    fs.for_each_concurrent(MAX_CONCURRENT, |IncomingService::Echo(stream)| {
        run_echo_server(stream).unwrap_or_else(|e| println!("{:?}", e))
    })
    .await;

    Ok(())
}

main 函数是异步的,因为它包括监听到 Echo 服务器的传入连接。fuchsia::main 属性告知 fuchsia 异步运行时在单个线程上运行 main Future 以完成操作。

main 也会返回 Result<(), Error>。如果其中一行 ? 代码从 main 返回 Error,系统将输出 Debug 错误,并且程序将返回表示失败的状态代码。

初始化 ServiceFs

获取 ServiceFs 的实例,该实例表示包含各种服务的文件系统。由于服务器将以单线程运行,因此请使用 ServiceFs::new_local() 而不是 ServiceFs::new()(后者支持多线程)。

#[fuchsia::main]
async fn main() -> Result<(), Error> {
    // Initialize the outgoing services provided by this component
    let mut fs = ServiceFs::new_local();
    fs.dir("svc").add_fidl_service(IncomingService::Echo);

    // Serve the outgoing services
    fs.take_and_serve_directory_handle()?;

    // Listen for incoming requests to connect to Echo, and call run_echo_server
    // on each one
    println!("Listening for incoming connections...");
    const MAX_CONCURRENT: usize = 10_000;
    fs.for_each_concurrent(MAX_CONCURRENT, |IncomingService::Echo(stream)| {
        run_echo_server(stream).unwrap_or_else(|e| println!("{:?}", e))
    })
    .await;

    Ok(())
}

添加 Echo FIDL 服务

要求组件管理器提供 Echo FIDL 服务。此函数调用包含两个部分:

#[fuchsia::main]
async fn main() -> Result<(), Error> {
    // Initialize the outgoing services provided by this component
    let mut fs = ServiceFs::new_local();
    fs.dir("svc").add_fidl_service(IncomingService::Echo);

    // Serve the outgoing services
    fs.take_and_serve_directory_handle()?;

    // Listen for incoming requests to connect to Echo, and call run_echo_server
    // on each one
    println!("Listening for incoming connections...");
    const MAX_CONCURRENT: usize = 10_000;
    fs.for_each_concurrent(MAX_CONCURRENT, |IncomingService::Echo(stream)| {
        run_echo_server(stream).unwrap_or_else(|e| println!("{:?}", e))
    })
    .await;

    Ok(())
}
  • 组件管理器必须知道如何处理传入的连接请求。可以通过传入接受 fidl::endpoints::RequestStream 并随其返回一些新值的闭包来指定这一点。例如,传入 |stream: EchoRequestStream| stream 的闭包是完全有效的。常见模式是定义服务器提供的可能服务的枚举,在此示例中:

    enum IncomingService {
        // Host a service protocol.
        Echo(EchoRequestStream),
        // ... more services here
    }
    

    然后将枚举变体“构造函数”作为闭包传递。如果有多个服务,这会导致通用的返回类型(IncomingService 枚举)。在监听传入连接时,所有 add_fidl_service 闭包的返回值都将成为 ServiceFs 流中的元素。

  • 组件管理器还必须知道这项服务的发布位置。 由于这是一项传出服务(即提供给其他组件的服务),因此该服务必须在 /svc 目录内添加路径。add_fidl_service 通过接受与闭包输入参数关联的 SERVICE_NAME 隐式获取此路径。在本例中,闭包参数 (IncomingService::Echo) 有一个类型为 EchoRequestStream 的输入参数,其关联的 SERVICE_NAME"fuchsia.examples.Echo"。因此,此调用会在 /svc/fuchsia.examples.Echo 处添加一个条目,客户端将需要搜索名为 "fuchsia.examples.Echo" 的服务才能连接到此服务器。

提供传出目录

#[fuchsia::main]
async fn main() -> Result<(), Error> {
    // Initialize the outgoing services provided by this component
    let mut fs = ServiceFs::new_local();
    fs.dir("svc").add_fidl_service(IncomingService::Echo);

    // Serve the outgoing services
    fs.take_and_serve_directory_handle()?;

    // Listen for incoming requests to connect to Echo, and call run_echo_server
    // on each one
    println!("Listening for incoming connections...");
    const MAX_CONCURRENT: usize = 10_000;
    fs.for_each_concurrent(MAX_CONCURRENT, |IncomingService::Echo(stream)| {
        run_echo_server(stream).unwrap_or_else(|e| println!("{:?}", e))
    })
    .await;

    Ok(())
}

此调用会将 ServiceFs 绑定到组件的 DirectoryRequest 启动句柄,并监听传入的连接请求。请注意,由于此操作会从进程的句柄表中移除句柄,因此每个进程只能调用一次此函数。如果您想为其他频道提供 ServiceFs,可以使用 serve_connection 函数。

开放协议的生命周期中详细介绍了此过程。

监听传入连接

运行 ServiceFs 以完成操作,以监听传入连接:

#[fuchsia::main]
async fn main() -> Result<(), Error> {
    // Initialize the outgoing services provided by this component
    let mut fs = ServiceFs::new_local();
    fs.dir("svc").add_fidl_service(IncomingService::Echo);

    // Serve the outgoing services
    fs.take_and_serve_directory_handle()?;

    // Listen for incoming requests to connect to Echo, and call run_echo_server
    // on each one
    println!("Listening for incoming connections...");
    const MAX_CONCURRENT: usize = 10_000;
    fs.for_each_concurrent(MAX_CONCURRENT, |IncomingService::Echo(stream)| {
        run_echo_server(stream).unwrap_or_else(|e| println!("{:?}", e))
    })
    .await;

    Ok(())
}

这会运行 ServiceFs Future,同时处理多达 10,000 个传入请求。传递给此调用的闭包是用于传入请求的处理程序 - ServiceFs 会先将传入连接与提供给 add_fidl_service 的闭包匹配,然后对结果调用处理程序(这是一个 IncomingService)。该处理程序接受 IncomingService,并对内部请求流调用 run_echo_server 来处理传入的 Echo 请求。

此处处理的请求分为两类。由 ServiceFs 处理的请求流包括连接到 Echo 服务器的请求(即每个客户端在连接到服务器时将发出一次此类请求),而由 run_echo_server 处理的请求流则是基于 Echo 协议的请求(即每个客户端可以向服务器发出任意数量的 EchoStringSendString 请求)。许多客户端可以同时请求连接到 Echo 服务器,因此系统会并发处理此类请求流。但是,单个客户端的所有请求都是按顺序进行的,因此并发处理请求没有好处。

测试服务器

重建:

fx build

然后,运行服务器组件:

ffx component run /core/ffx-laboratory:echo_server fuchsia-pkg://fuchsia.com/echo-rust-server#meta/echo_server.cm

您应该会在设备日志 (ffx log) 中看到类似于以下内容的输出:

[ffx-laboratory:echo_server][][I] Listening for incoming connections...

服务器现在正在运行并在等待传入请求。下一步是编写一个发送 Echo 协议请求的客户端。目前,您可以直接终止服务器组件:

ffx component destroy /core/ffx-laboratory:echo_server