在 Rust 中实现 FIDL 服务器

前提条件

本教程假设您熟悉如何在 GN 中将库的 FIDL Rust 绑定列为依赖项,以及如何将绑定导入到 Rust 代码中,这些内容在 Rust FIDL crate 教程中都有介绍。

概览

本教程介绍了如何实现 FIDL 协议 (fuchsia.examples.Echo) 并在 Fuchsia 上运行该协议。此协议为每种类型提供了一种方法:

  • EchoString 是一种带响应的方法。
  • SendString 是一种无响应的方法。
  • OnString 是事件。
@discoverable
closed protocol Echo {
    strict EchoString(struct {
        value string:MAX_STRING_LENGTH;
    }) -> (struct {
        response string:MAX_STRING_LENGTH;
    });
    strict SendString(struct {
        value string:MAX_STRING_LENGTH;
    });
    strict -> OnString(struct {
        response string:MAX_STRING_LENGTH;
    });
};

如需详细了解 FIDL 方法和消息传递模型,请参阅 FIDL 概念页面。

本文档介绍了如何完成以下任务:

  • 实现 FIDL 协议。
  • 在 Fuchsia 上构建并运行软件包。
  • 提供 FIDL 协议。

本教程首先创建一个组件,该组件提供给 Fuchsia 设备并运行。然后,逐步添加功能以启动和运行服务器。

如果您想自行编写代码,请删除以下目录:

rm -r examples/fidl/rust/server/*

创建组件

如需创建组件,请执行以下操作:

  1. main() 函数添加到 examples/fidl/rust/server/src/main.rs

    fn main() {
      println!("Hello, world!");
    }
    
  2. examples/fidl/rust/server/BUILD.gn 中为服务器声明目标:

    import("//build/rust/rustc_binary.gni")
    
    
    # Declare an executable for the server. This produces a binary with the
    # specified output name that can run on Fuchsia.
    rustc_binary("bin") {
      output_name = "fidl_echo_rust_server"
      edition = "2021"
    
      sources = [ "src/main.rs" ]
    }
    
    # Declare a component for the server, which consists of the manifest and the
    # binary that the component will run.
    fuchsia_component("echo-server") {
      component_name = "echo_server"
      manifest = "meta/server.cml"
      deps = [ ":bin" ]
    }
    
    # Declare a package that contains a single component, our server.
    fuchsia_package("echo-rust-server") {
      deps = [ ":echo-server" ]
    }
    

    要启动并运行服务器组件,需要定义三个目标:

    • 构建为在 Fuchsia 上运行的服务器的原始可执行文件。
    • 设置为仅运行服务器可执行文件的组件,该文件使用组件的清单文件进行描述。
    • 然后,将该组件放入软件包中,软件包是 Fuchsia 上软件分发的单元。在本例中,该软件包只包含一个组件。

    如需详细了解软件包、组件以及如何构建它们,请参阅构建组件页面。

  3. examples/fidl/rust/server/meta/server.cml 中添加组件清单:

    {
        include: [ "syslog/client.shard.cml" ],
    
        // Information about the program to run.
        program: {
            // Use the built-in ELF runner.
            runner: "elf",
    
            // The binary to run for this component.
            binary: "bin/fidl_echo_rust_server",
        },
    
        // Capabilities provided by this component.
        capabilities: [
            { protocol: "fuchsia.examples.Echo" },
        ],
        expose: [
            {
                protocol: "fuchsia.examples.Echo",
                from: "self",
            },
        ],
    }
    
    
  4. 将服务器添加到 build 配置中:

    fx set core.x64 --with //examples/fidl/rust/server:echo-rust-server
    
  5. 构建 Fuchsia 映像:

    fx build
    

实现服务器

首先,您将实现 Echo 协议的行为。在 Rust 中,这表示为可以处理协议的关联请求流类型(在本例中为 EchoRequestStream)的代码。此类型是 Echo 请求的流,即它实现了 futures::Stream<Item = Result<EchoRequest, fidl::Error>>

您将实现 run_echo_server() 来处理请求流,这是一个用于处理传入服务请求的异步函数。它会返回一个 Future,该 Future 会在客户端通道关闭后完成。

添加依赖项

  1. 导入所需的依赖项:

    // we'll use anyhow to propagate errors that occur when handling the request stream
    use anyhow::{Context as _, Error};
    // the server will need to handle an EchoRequestStream
    use fidl_fuchsia_examples::{EchoRequest, EchoRequestStream};
    // import the futures prelude, which includes things like the Future and Stream traits
    use futures::prelude::*;
    
  2. 将它们作为 build 依赖项添加到 rustc_binary 目标。deps 字段应如下所示:

    deps = [
      "//examples/fidl/fuchsia.examples:fuchsia.examples_rust",
      "//third_party/rust_crates:anyhow",
      "//third_party/rust_crates:futures",
    ]
    

定义 run_echo_server

// An implementation of the Echo stream, which handles a stream of EchoRequests
async fn run_echo_server(stream: EchoRequestStream) -> Result<(), Error> {
    stream
        .map(|result| result.context("failed request"))
        .try_for_each(|request| async move {
            match request {
                // Handle each EchoString request by responding with the request
                // value
                EchoRequest::EchoString { value, responder } => {
                    println!("Received EchoString request for string {:?}", value);
                    responder.send(&value).context("error sending response")?;
                    println!("Response sent successfully");
                }
                // Handle each SendString request by sending a single OnString
                // event with the request value
                EchoRequest::SendString { value, control_handle } => {
                    println!("Received SendString request for string {:?}", value);
                    control_handle.send_on_string(&value).context("error sending event")?;
                    println!("Event sent successfully");
                }
            }
            Ok(())
        })
        .await
}

该实现包含以下元素:

  • 该代码通过对每个结果使用 .context() 方法附加上下文,将请求流中的 fidl:Error 转换为 anyhow::Error

    // An implementation of the Echo stream, which handles a stream of EchoRequests
    async fn run_echo_server(stream: EchoRequestStream) -> Result<(), Error> {
        stream
            .map(|result| result.context("failed request"))
            .try_for_each(|request| async move {
                match request {
                    // Handle each EchoString request by responding with the request
                    // value
                    EchoRequest::EchoString { value, responder } => {
                        println!("Received EchoString request for string {:?}", value);
                        responder.send(&value).context("error sending response")?;
                        println!("Response sent successfully");
                    }
                    // Handle each SendString request by sending a single OnString
                    // event with the request value
                    EchoRequest::SendString { value, control_handle } => {
                        println!("Received SendString request for string {:?}", value);
                        control_handle.send_on_string(&value).context("error sending event")?;
                        println!("Event sent successfully");
                    }
                }
                Ok(())
            })
            .await
    }
    

    在此阶段,Result<EchoRequest, fidl::Error> 流会变为 Result<EchoRequest, anyhow::Error> 流。

  • 然后,该函数会对生成的流调用 try_for_each,后者会返回一个 Future。此方法会展开数据流中的 Result;任何失败都会导致 Future 立即返回该错误,并且任何成功的内容都会传递给闭包。同样,如果闭包的返回值解析为失败,生成的 Future 将立即返回该错误:

    // An implementation of the Echo stream, which handles a stream of EchoRequests
    async fn run_echo_server(stream: EchoRequestStream) -> Result<(), Error> {
        stream
            .map(|result| result.context("failed request"))
            .try_for_each(|request| async move {
                match request {
                    // Handle each EchoString request by responding with the request
                    // value
                    EchoRequest::EchoString { value, responder } => {
                        println!("Received EchoString request for string {:?}", value);
                        responder.send(&value).context("error sending response")?;
                        println!("Response sent successfully");
                    }
                    // Handle each SendString request by sending a single OnString
                    // event with the request value
                    EchoRequest::SendString { value, control_handle } => {
                        println!("Received SendString request for string {:?}", value);
                        control_handle.send_on_string(&value).context("error sending event")?;
                        println!("Event sent successfully");
                    }
                }
                Ok(())
            })
            .await
    }
    
  • 闭包的内容通过匹配传入的 EchoRequest 来确定它们是哪种类型的请求,从而进行相应处理:

    // An implementation of the Echo stream, which handles a stream of EchoRequests
    async fn run_echo_server(stream: EchoRequestStream) -> Result<(), Error> {
        stream
            .map(|result| result.context("failed request"))
            .try_for_each(|request| async move {
                match request {
                    // Handle each EchoString request by responding with the request
                    // value
                    EchoRequest::EchoString { value, responder } => {
                        println!("Received EchoString request for string {:?}", value);
                        responder.send(&value).context("error sending response")?;
                        println!("Response sent successfully");
                    }
                    // Handle each SendString request by sending a single OnString
                    // event with the request value
                    EchoRequest::SendString { value, control_handle } => {
                        println!("Received SendString request for string {:?}", value);
                        control_handle.send_on_string(&value).context("error sending event")?;
                        println!("Event sent successfully");
                    }
                }
                Ok(())
            })
            .await
    }
    

    此实现通过回传输入来处理 EchoString 请求,并通过发送 OnString 事件来处理 SendString 请求。由于 SendString 是一种“发后不理”方法,因此请求枚举变体附带一个控制句柄,可用于与服务器进行通信。

    在这两种情况下,通过添加上下文并使用 ? 运算符,系统会传播发送消息回客户端时发生的错误。如果成功到达闭包的末尾,则会返回 Ok(())

  • 最后,服务器函数会将从 try_for_each 返回的 Future await 到完成状态,这将对每个传入请求调用闭包,并在处理完所有请求或遇到任何错误时返回。

您可以通过运行以下命令来验证实现是否正确:

fx build

提供协议

现在,您已定义用于处理传入请求的代码,接下来需要监听与 Echo 服务器的传入连接。为此,需要请求组件管理器将 Echo 协议公开给其他组件。然后,组件管理器会将对回声协议的所有请求路由到我们的服务器。

为了满足这些请求,组件管理器需要协议的名称,以及在有任何传入请求连接到与指定名称匹配的协议时应调用的处理程序。

添加依赖项

  1. 导入所需的依赖项:

    // Import the Fuchsia async runtime in order to run the async main function
    use fuchsia_async as fasync;
    // ServiceFs is a filesystem used to connect clients to the Echo service
    use fuchsia_component::server::ServiceFs;
    
  2. 将它们作为 build 依赖项添加到 rustc_binary 目标中。完整的目标如下所示:

    rustc_binary("bin") {
      name = "fidl_echo_rust_server"
      edition = "2021"
    
      deps = [
        "//examples/fidl/fuchsia.examples:fuchsia.examples_rust",
        "//src/lib/fuchsia",
        "//src/lib/fuchsia-component",
        "//third_party/rust_crates:anyhow",
        "//third_party/rust_crates:futures",
      ]
    
      sources = [ "src/main.rs" ]
    }
    
    

定义 main 函数

#[fuchsia::main]
async fn main() -> Result<(), Error> {
    // Initialize the outgoing services provided by this component
    let mut fs = ServiceFs::new_local();
    fs.dir("svc").add_fidl_service(IncomingService::Echo);

    // Serve the outgoing services
    fs.take_and_serve_directory_handle()?;

    // Listen for incoming requests to connect to Echo, and call run_echo_server
    // on each one
    println!("Listening for incoming connections...");
    const MAX_CONCURRENT: usize = 10_000;
    fs.for_each_concurrent(MAX_CONCURRENT, |IncomingService::Echo(stream)| {
        run_echo_server(stream).unwrap_or_else(|e| println!("{:?}", e))
    })
    .await;

    Ok(())
}

主函数是异步的,因为它包含监听传入到 Echo 服务器的连接。fuchsia::main 属性告知 fuchsia 异步运行时在单个线程上运行 main Future 以完成操作。

main 也会返回 Result<(), Error>。如果其中一条 ? 行的结果从 main 返回 Error,系统将输出 Debug 错误,并且程序会返回一个表示失败的状态代码。

初始化 ServiceFs

获取 ServiceFs 实例,该实例表示包含各种服务的文件系统。由于服务器将以单线程方式运行,因此请使用 ServiceFs::new_local() 而非 ServiceFs::new()(后者支持多线程)。

#[fuchsia::main]
async fn main() -> Result<(), Error> {
    // Initialize the outgoing services provided by this component
    let mut fs = ServiceFs::new_local();
    fs.dir("svc").add_fidl_service(IncomingService::Echo);

    // Serve the outgoing services
    fs.take_and_serve_directory_handle()?;

    // Listen for incoming requests to connect to Echo, and call run_echo_server
    // on each one
    println!("Listening for incoming connections...");
    const MAX_CONCURRENT: usize = 10_000;
    fs.for_each_concurrent(MAX_CONCURRENT, |IncomingService::Echo(stream)| {
        run_echo_server(stream).unwrap_or_else(|e| println!("{:?}", e))
    })
    .await;

    Ok(())
}

添加 Echo FIDL 服务

让组件管理器公开 Echo FIDL 服务。此函数调用分为两部分:

#[fuchsia::main]
async fn main() -> Result<(), Error> {
    // Initialize the outgoing services provided by this component
    let mut fs = ServiceFs::new_local();
    fs.dir("svc").add_fidl_service(IncomingService::Echo);

    // Serve the outgoing services
    fs.take_and_serve_directory_handle()?;

    // Listen for incoming requests to connect to Echo, and call run_echo_server
    // on each one
    println!("Listening for incoming connections...");
    const MAX_CONCURRENT: usize = 10_000;
    fs.for_each_concurrent(MAX_CONCURRENT, |IncomingService::Echo(stream)| {
        run_echo_server(stream).unwrap_or_else(|e| println!("{:?}", e))
    })
    .await;

    Ok(())
}
  • 组件管理器必须知道如何处理传入的连接请求。可以通过传入一个闭包指定,该闭包接受 fidl::endpoints::RequestStream 并随其返回一些新值。例如,传入 |stream: EchoRequestStream| stream 的闭包完全有效。常见模式是定义服务器提供的可能服务的枚举,在此示例中:

    enum IncomingService {
        // Host a service protocol.
        Echo(EchoRequestStream),
        // ... more services here
    }
    

    然后将枚举变体“构造函数”作为闭包传递。如果提供了多项服务,则会产生通用的返回类型(IncomingService 枚举)。在监听传入连接时,所有 add_fidl_service 闭包的返回值都将成为 ServiceFs 流中的元素。

  • 组件管理器还必须知道此服务将在哪里提供。由于这是出站服务(即提供给其他组件的服务),因此该服务必须在 /svc 目录中添加路径。add_fidl_service 会通过获取与闭包输入参数关联的 SERVICE_NAME 来隐式获取此路径。在本例中,闭包参数 (IncomingService::Echo) 有一个类型为 EchoRequestStream 的输入参数,该参数有一个关联的 SERVICE_NAME(即 "fuchsia.examples.Echo")。因此,此调用会在 /svc/fuchsia.examples.Echo 中添加一个条目,并且客户端需要搜索名为 "fuchsia.examples.Echo" 的服务才能连接到此服务器。

提供传出目录

#[fuchsia::main]
async fn main() -> Result<(), Error> {
    // Initialize the outgoing services provided by this component
    let mut fs = ServiceFs::new_local();
    fs.dir("svc").add_fidl_service(IncomingService::Echo);

    // Serve the outgoing services
    fs.take_and_serve_directory_handle()?;

    // Listen for incoming requests to connect to Echo, and call run_echo_server
    // on each one
    println!("Listening for incoming connections...");
    const MAX_CONCURRENT: usize = 10_000;
    fs.for_each_concurrent(MAX_CONCURRENT, |IncomingService::Echo(stream)| {
        run_echo_server(stream).unwrap_or_else(|e| println!("{:?}", e))
    })
    .await;

    Ok(())
}

此调用将 ServiceFs 绑定到组件的 DirectoryRequest 启动句柄,并监听传入的连接请求。请注意,由于这会从进程的句柄表中移除句柄,因此每个进程只能调用此函数一次。如果您想向其他渠道提供 ServiceFs,可以使用 serve_connection 函数。

协议打开生命周期中对此过程进行了详细说明。

监听传入连接

运行 ServiceFs 以完成操作,以监听传入连接:

#[fuchsia::main]
async fn main() -> Result<(), Error> {
    // Initialize the outgoing services provided by this component
    let mut fs = ServiceFs::new_local();
    fs.dir("svc").add_fidl_service(IncomingService::Echo);

    // Serve the outgoing services
    fs.take_and_serve_directory_handle()?;

    // Listen for incoming requests to connect to Echo, and call run_echo_server
    // on each one
    println!("Listening for incoming connections...");
    const MAX_CONCURRENT: usize = 10_000;
    fs.for_each_concurrent(MAX_CONCURRENT, |IncomingService::Echo(stream)| {
        run_echo_server(stream).unwrap_or_else(|e| println!("{:?}", e))
    })
    .await;

    Ok(())
}

这会运行 ServiceFs Future,并最多同时处理 10,000 个传入请求。传递给此调用的闭包是用于处理传入请求的处理程序 - ServiceFs 会先将传入连接与提供给 add_fidl_service 的闭包进行匹配,然后对结果(即 IncomingService)调用处理程序。处理程序会接受 IncomingService,并对内部请求流调用 run_echo_server 来处理传入的 Echo 请求。

这里处理的请求分为两种。ServiceFs 处理的请求流包含连接到 Echo 服务器的请求(即每个客户端在连接到服务器时发出一次此类请求),而 run_echo_server 处理的请求流是 Echo 协议上的请求(即每个客户端可以向服务器发出任意数量的 EchoStringSendString 请求)。许多客户端可以同时请求连接到 Echo 服务器,因此系统会并发处理此请求流。不过,针对单个客户端的所有请求都是顺序发生的,因此并行处理请求没有任何好处。

测试服务器

重建:

fx build

然后,运行服务器组件:

ffx component run /core/ffx-laboratory:echo_server fuchsia-pkg://fuchsia.com/echo-rust-server#meta/echo_server.cm

`fuchsia-pkg://` scheme。

您应该会在设备日志 (ffx log) 中看到类似如下所示的输出:

[ffx-laboratory:echo_server][][I] Listening for incoming connections...

服务器现已运行,并等待传入请求。下一步是编写一个用于发送 Echo 协议请求的客户端。目前,您只需终止服务器组件即可:

ffx component destroy /core/ffx-laboratory:echo_server