首页 > 解决方案 > 如何使用 Tokio 实现基于拉的系统?

问题描述

我想在服务器和客户端之间实现一个基于拉的系统,服务器只会在客户端请求时推送数据。

我在玩 Tokio 并且能够创建一个基于推送的系统,我能够以 1 毫秒的间隔推送一个字符串。

let done = listener
    .incoming()
    .for_each(move |socket| {
        let server_queue = _cqueue.clone();
        let (reader, mut writer) = socket.split();
        let sender = Interval::new_interval(std::time::Duration::from_millis(1))
            .for_each(move |_| {
                writer
                    .poll_write(server_queue.pull().borrow())
                    .map_err(|_| {
                        tokio::timer::Error::shutdown();
                    })
                    .unwrap();
                return Ok(());
            })
            .map_err(|e| println!("{}", e));
        ;
        tokio::spawn(sender);
        return Ok(());
    })
    .map_err(|e| println!("Future_error {}", e));

有没有办法只在客户要求时才发送而不必使用阅读器?

标签: rustrust-tokio

解决方案


让我们回想一下可能导致这种“发送数据”的事件。您可以考虑多种方式:

  • 客户端连接到服务器。根据合同,这是“要求数据”。你已经实现了这个案例
  • 客户端在连接客户端和服务器的套接字/管道上发送带内消息。为此,您需要使用您已经使用过的AsyncRead部分,并建立一个双工通道,以便您可以同时阅读和交谈socketAsyncWrite
  • 客户端发送带外消息,通常在另一个 proto-host-port 三元组上并使用不同的协议。您当前的服务器识别它,并将该数据发送给客户端。为此,您需要另一个三元组的阅读器,并且您需要一个消息结构来将其中继到可以访问AsyncWrite套接字部分的地方

简短的回答是否定的,你不能真正对你没有在听的事件采取行动。


@Shepmaster我只是想知道是否有一个现有的库可以用来“整齐”地处理这个

有,然后就没有了。

大多数图书馆都以特定问题为中心。在您的情况下,您选择通过拥有 TCP 套接字(实现AsyncRead + AsyncWrite)在尽可能低的级别上工作。

做任何事情,你需要决定:

  1. 一种传输格式
  2. 一个协议

当我需要快速而肮脏的双工流实现时,我倾向于将代码包装到其中:

use futures::sync::mpsc::{UnboundedSender, unbounded};
use std::sync::{Arc};
use futures::{Sink, Stream, Future, future, stream};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::codec::{Framed, Encoder, Decoder};
use std::io;
use std::fmt::Debug;
use futures_locks::{RwLock as FutLock};

enum Message<T:Send+Debug+'static> {
    Content(T),
    Done
}

impl<T: Send + Debug + 'static> From<T> for Message<T> {
    fn from(message:T) -> Message<T> {
        Message::Content(message)
    }
}

struct DuplexStream<T:Send+Debug+'static> {
    writer: Arc<FutLock<UnboundedSender<Message<T>>>>,
    handlers: Arc<FutLock<Option<Box<dyn Stream<Item = Message<T>, Error = ()> + Send>>>>
}

impl<T:Send+Debug+'static> DuplexStream<T> {

    pub fn from<R,U>(framed_socket: Framed<R, U>) -> Arc<DuplexStream<T>>
        where U: Send + Encoder<Item = T> + Decoder<Item = T> + 'static, R: Send + AsyncRead + AsyncWrite + 'static {

        let (tx, rx) = framed_socket.split();

        // Assemble the combined upstream stream
        let (upstream_tx, upstream_rx) = unbounded();
        let upstream = upstream_rx.take_while(|item| match item {
            Message::Done => future::ok(false),
            _ => future::ok(true)
        }).fold(tx, |o, m| {
            o.send(match m {
                Message::Content(i) => i,
                _ => unreachable!()
            }).map_err(|_| {
                ()
            })
        }).map(|e| {
            Message::Done
        }).into_stream();

        // Assemble the downstream stream
        let downstream = rx.map_err(|_| ()).map(|r| {
            Message::Content(r)
        }).chain(stream::once(Ok(Message::Done)));

        Arc::new(DuplexStream {
            writer: Arc::new(FutLock::new(upstream_tx)),
            handlers: Arc::new(FutLock::new(Some(Box::new(upstream.select(downstream).take_while(|m| match m {
                Message::Content(_) => {
                    future::ok(true)
                },
                Message::Done => {
                    future::ok(false)
                }
            })))))
        })
    }

    pub fn start(self: Arc<Self>) -> Box<dyn Stream<Item = T, Error = io::Error> + Send> {
        Box::new(self.handlers
            .write()
            .map_err(|_| io::Error::new(io::ErrorKind::NotFound, "Stream closed"))
            .map(|mut handler| -> Box<dyn Stream<Item = T, Error = io::Error> + Send> {
                match handler.take() {
                    Some(e) => Box::new(e.map(|r| match r {
                        Message::Content(i) => i,
                        _ => unreachable!()
                    }).map_err(|_| io::Error::new(io::ErrorKind::NotFound, "Stream closed"))),
                    None => Box::new(stream::once(Err(io::Error::new(io::ErrorKind::AddrInUse, "Handler already taken"))))
                }
            }).into_stream().flatten()
        )
    }

    pub fn close(self: Arc<Self>) -> Box<dyn Future<Item = (), Error = io::Error> + Send> {
        self.inner_send(Message::Done)
    }
    pub fn send(self: Arc<Self>, message: T) -> Box<dyn Future<Item = (), Error = io::Error> + Send> {
        self.inner_send(message.into())
    }
    pub fn inner_send(self: Arc<Self>, message: Message<T>) -> Box<dyn Future<Item = (), Error = io::Error> + Send> {
        Box::new(self.writer.write()
            .map_err(|_| io::Error::new(io::ErrorKind::NotFound, "The mutex has disappeared")).and_then(|guard| {
                future::result(guard.unbounded_send(message).map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "The sink has gone away")))
        }))
    }
}

这种结构有很多优点,但也有一些缺点。主要优点是您可以像使用另一种语言一样处理同一对象上的读取和写入部分。对象本身实现Clone(因为它是一个Arc),每个方法都可以在任何地方使用(对旧代码特别有用futures)并且只要您在某处保留它的副本并且不调用close()它就会继续运行(只要底层AsyncRead + AsyncWrite实现还在)。

这并不能免除您从第 1 点和第 2 点的责任,但您可以(并且应该)利用tokio::codec::Framed第 1 点,并将第 2 点作为业务逻辑实施。

一个例子(它实际上是一个测试;-))的用法:

#[test]
fn it_writes() {
    let stream = DuplexStream::from(make_w());
    let stream_write = Arc::clone(&stream);
    let stream_read=  Arc::clone(&stream);
    let dup = Arc::clone(&stream);
    tokio::run(lazy(move || {
        let stream_write = Arc::clone(&stream_write);
        stream_read.start().and_then(move |i| {
            let stream_write = Arc::clone(&stream_write);
            stream_write.send("foo".to_string()).map(|_| i)
        }).collect().map(|r| {
            assert_eq!(r, vec!["foo".to_string(), "bar".to_string(), "bazfoo".to_string(), "foo".to_string()])
        }).map_err(|_| {
            assert_eq!(true, false);
        })
    }));
}

推荐阅读