首页 > 解决方案 > 当有人订阅时做些什么?

问题描述

我有一个 BLE 设备数据流的广播流。

为了从该流中获取数据,我需要先将数据发送到设备。

Stream<Data> dataStream() {
  sendDataRequestToDevice();
  return _broadcastController.stream;
}

问题在于一切都是异步的,这意味着当返回流时,从设备发送的实际事件很可能已经消失了。我正在寻找类似的东西:

Stream<Data> dataStream() {
  return _broadcastController.stream
    .doOnSubscribe(() => sendDataRequestToDevice()); // stolen from rxjava ;)
}

在不使用 RxDart 或类似的情况下,默认 Streaming 库中是否有类似的东西。(我只是不想为此目的使用它......)

标签: asynchronousdartstream

解决方案


如果你关心你有多少订阅者,你可能不应该使用广播流。广播流的基本思想是它在不知道(或关心)谁在收听的情况下进行广播。onListenonCancel回调最初不是广播控制器的一部分,它们稍微破坏了模型。他们确实让你知道什么时候没有人在听,但仅此而已。

在这种情况下,我会制作自己的流来记录收听和取消。

class _ListenStream<T> extends Stream<T> {
  final Stream<T> _source;
  final void Function() _onListen; 
  final void Function() _onCancel;
  _ListenStream(this._source, this._onListen, this._onCancel);
  bool get isBroadcastStream => _source.isBroadcastStream;
  StreamSubscription<T> listen(void Function(T) onData, {
    Function onError, void Function() onDone, bool cancelOnError = false}) {
    if (_onListen != null) _onListen();
    return _ListenSubscription<T>(_source.listen(onData, 
        onError: onError, onDone: onDone, cancelOnError: cancelOnError),
        _onCancel);  
  }
}
class _ListenSubscription<T> extends StreamSubscription<T> {
  final _StreamSubscription<T> _source;
  final void Function() _onCancel;

  void onData(void handleData(T data)) { _source.onData(handleData); }
  void onError(Function handleError) { _source.onError(handleError); }
  void onDone(void handleDone()) { _source.onDone(handleDone); }

  void pause([Future resumeSignal]) { _source.pause(resumeSignal); }
  void resume() { _source.resume(); }
  Future<E> asFuture<E>([E defaultValue]) => _source.asFuture<E>(defaultValue);
  bool get isPaused => _source.isPaused;

  Future cancel() {
    var future = _source.cancel();
    if (_onCancel != null) future = future.whenComplete(_onCancel);
    return future;
  }
}

然后,您不提供原始流,而是为您的客户提供原始流的_ListenStream包装器,以及您希望在侦听和取消时调用的回调。


推荐阅读