asynchronous - 当有人订阅时做些什么?
问题描述
我有一个 BLE 设备数据流的广播流。
为了从该流中获取数据,我需要先将数据发送到设备。
Stream<Data> dataStream() {
sendDataRequestToDevice();
return _broadcastController.stream;
}
问题在于一切都是异步的,这意味着当返回流时,从设备发送的实际事件很可能已经消失了。我正在寻找类似的东西:
Stream<Data> dataStream() {
return _broadcastController.stream
.doOnSubscribe(() => sendDataRequestToDevice()); // stolen from rxjava ;)
}
在不使用 RxDart 或类似的情况下,默认 Streaming 库中是否有类似的东西。(我只是不想为此目的使用它......)
解决方案
如果你关心你有多少订阅者,你可能不应该使用广播流。广播流的基本思想是它在不知道(或关心)谁在收听的情况下进行广播。onListen
和onCancel
回调最初不是广播控制器的一部分,它们稍微破坏了模型。他们确实让你知道什么时候没有人在听,但仅此而已。
在这种情况下,我会制作自己的流来记录收听和取消。
class _ListenStream<T> extends Stream<T> {
final Stream<T> _source;
final void Function() _onListen;
final void Function() _onCancel;
_ListenStream(this._source, this._onListen, this._onCancel);
bool get isBroadcastStream => _source.isBroadcastStream;
StreamSubscription<T> listen(void Function(T) onData, {
Function onError, void Function() onDone, bool cancelOnError = false}) {
if (_onListen != null) _onListen();
return _ListenSubscription<T>(_source.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError),
_onCancel);
}
}
class _ListenSubscription<T> extends StreamSubscription<T> {
final _StreamSubscription<T> _source;
final void Function() _onCancel;
void onData(void handleData(T data)) { _source.onData(handleData); }
void onError(Function handleError) { _source.onError(handleError); }
void onDone(void handleDone()) { _source.onDone(handleDone); }
void pause([Future resumeSignal]) { _source.pause(resumeSignal); }
void resume() { _source.resume(); }
Future<E> asFuture<E>([E defaultValue]) => _source.asFuture<E>(defaultValue);
bool get isPaused => _source.isPaused;
Future cancel() {
var future = _source.cancel();
if (_onCancel != null) future = future.whenComplete(_onCancel);
return future;
}
}
然后,您不提供原始流,而是为您的客户提供原始流的_ListenStream
包装器,以及您希望在侦听和取消时调用的回调。
推荐阅读
- angular - 使用 Angular 比较日期范围并限制超过 3 个月的日期
- java - 在运行 groovy 脚本时,如何告诉 groovy 加载我的 Maven pom.xml 文件中列出的所有依赖项?
- node.js - 无法使用节点 js 找到 docker compose 中不存在的模块
- python - 如何在 Tweepy 中为 API.search 的 geocode 参数指定多个坐标
- git - 在 Azure git 中锁定分支的 API
- angular - Angular 材质拖放 cdk:将项目合二为一并交换元素
- mysql - 为什么 MySQL Workbench 中的“获取”(如持续时间/获取)需要很长时间才能获取非常有限的数据量?
- typescript - nestjs 套接字连接和断开连接
- graphql - 如何使用 github GraphQL API 创建问题?
- c - malloc() 调用不会跨函数持续存在