首页 > 解决方案 > 如何异步检索数据并使用基于 Tokio 的回显服务器对其进行修改?

问题描述

我正在开发一个从 TCP 获取数据并将一些逻辑应用于该数据的回显服务器。例如,如果客户端数据按照hello我想要的方式进入,则将其响应为hello from server.

我可以使用该copy功能转发输入数据,但这在我的情况下没有用。

这是我正在处理的起始代码:

extern crate futures;
extern crate tokio_core;
extern crate tokio_io;

use futures::stream::Stream;
use futures::Future;
use std::net::SocketAddr;
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use tokio_io::io::copy;
use tokio_io::AsyncRead;

fn main() {
    let addr = "127.0.0.1:15000".parse::<SocketAddr>().unwrap();
    let mut core = Core::new().unwrap();
    let handle = core.handle();

    let socket = TcpListener::bind(&addr, &handle).unwrap();
    println!("Listening on: {}", addr);

    let done = socket.incoming().for_each(move |(socket, addr)| {
        let (reader, writer) = socket.split();
        let amt = copy(reader, writer);

        let msg = amt.then(move |result| {
            match result {
                Ok((amt, _, _)) => println!("wrote {} bytes to {}", amt, addr),
                Err(e) => println!("error on {}: {}", addr, e),
            }

            Ok(())
        });

        handle.spawn(msg);

        Ok(())
    });

    core.run(done).unwrap();
}

我知道我需要添加一些逻辑而不是这个复制功能,但是如何?

let amt = copy(reader, writer);

标签: asynchronousrustrust-tokio

解决方案


从某种意义上说,回显服务器有点特殊,即来自客户端的一个“请求”紧随其后的是来自服务器的一个响应。此类用例的一个非常好的示例是 tokio 的TinyDB 示例

然而,应该考虑的一件事是,虽然 UDP 基于数据包,但它以您发送它们的确切形式到达另一端,而 TCP 不是。TCP 是一种流协议 - 它具有很强的保证,即数据包被另一端接收,并且发送的数据完全按照发送的顺序接收。但是,不能保证的是,一个调用“发送" 一方面会导致另一侧恰好有一个“接收”调用,返回与发送的完全相同的数据块。这在发送非常长的数据块时尤其重要,其中一个发送映射到多个接收。因此,在尝试向客户端发送响应之前,您应该选择服务器可以等待的分隔符。在 Telnet 中,该分隔符将是“\r\n”。这就是东京' s 解码器/编码器基础设施开始发挥作用。这种编解码器的示例实现是线编解码器。如果你想拥有 Telnet,这正是你想要的。它一次只会给你一条消息,并允许你一次只发送一条这样的消息作为响应:

extern crate tokio;

use tokio::codec::Decoder;
use tokio::net::TcpListener;
use tokio::prelude::*;
use tokio::codec::LinesCodec;
use std::net::SocketAddr;

fn main() {
    let addr = "127.0.0.1:15000".parse::<SocketAddr>().unwrap();

    let socket = TcpListener::bind(&addr).unwrap();
    println!("Listening on: {}", addr);

    let done = socket.incoming()
        .map_err(|e| println!("failed to accept socket; error = {:?}", e))
        .for_each(move |socket| {
            // Fit the line-based codec on top of the socket. This will take on the task of
            // parsing incomming messages, as well as formatting outgoing ones (appending \r\n).
            let (lines_tx, lines_rx) = LinesCodec::new().framed(socket).split();

            // This takes every incomming message and allows to create one outgoing message for it,
            // essentially generating a stream of responses.
            let responses = lines_rx.map(|incomming_message| {
                // Implement whatever transform rules here
                if incomming_message == "hello" {
                    return String::from("hello from server");
                }
                return incomming_message;
            });

            // At this point `responses` is a stream of `Response` types which we
            // now want to write back out to the client. To do that we use
            // `Stream::fold` to perform a loop here, serializing each response and
            // then writing it out to the client.
            let writes = responses.fold(lines_tx, |writer, response| {
                //Return the future that handles to send the response to the socket
                writer.send(response)
            });

            // Run this request/response loop until the client closes the connection
            // Then return Ok(()), ignoring all eventual errors.
            tokio::spawn(
                writes.then(move |_| Ok(()))
            );

            return Ok(());
        });

    tokio::run(done);
}

推荐阅读