meteor - 如何将mergeMap内部订阅限制为N个最新或滑动窗口队列
问题描述
我有一个从两个流合并的源流。当源流发出事件时,我想调用订阅函数Meteor.subscribe
并保持打开状态,所以我使用mergeMap
. 当订阅准备好时,我通过管道连接到另一个mergeMap
来填充数据。它运行良好,直到我点击 100 次并且内存消耗猛增。问题是,如何限制mergeMap,不是限制在前N 个订阅concurrent: Number
,而是限制到最近的N 个订阅,就像滑动窗口一样?
function paginationCache$(): Observable<any> {
return merge(this.pageParamsChanged$, this.routerParamsChanged$)
.pipe(
mergeMap((newParams) => {
// First merge map subscribes to data and un subscribes when second merge map unsubscribes
return Observable.create((observer: Subscriber<any>) => {
let subscription = Meteor.subscribe('my/best/data/sub', newParams,
() => {
observer.next(subscription);
observer.complete();
});
});
}),
mergeMap((subscription: any) => {
// second subscription is just populating the data
return Observable.create((observer: Subscriber<Meteor.Error | any>) => {
const collection = new Mongo.Collection(subscription.collectionName);
const { selector, options } = this.mongoParams();
collection.find(selector, options).dataChanges((data) => {
observer.next({ data });
});
return () => {
subscription.stop();
};
});
})
);
}
我想更详细地解释该代码中发生的事情。
在我的示例中,只要我单击 Web 界面中的按钮,源流(前管道)就永远不会完成,因此当我单击界面中的下一个或上一个按钮时它会发出更改。首先从源流中获取更改并将它们发送到后端 API(也有冲突的命名发布/订阅)。因此,当客户端上的数据可用时,我调用移动到第二个,但我无法销毁或停止流星的订阅。两个原因:1.我想实时更改所选数据,2.如果我停止流星的订阅,客户端的数据将被删除。因此,如果它在服务器上更新,那么现在它会不断更新选定的数据。merge
mergeMap
observer.next(subscription)
mergeMap
mergeMap
因此,在每个 UI 按钮单击(下一个、上一个)之后,我都有新的订阅链。如果原始数据表不大(1000 条记录)并且我只是单击了几次,那没关系。但是,我可以拥有超过 30000 个,并且可以多次单击我的按钮。
所以,我们的想法是让 mergeMap 像一个大小有限的队列,只保存最后 N 个订阅,但是当我单击按钮时,队列一直在变化。
最后编辑:工作代码:
function paginationCache$(): Observable<any> {
const N = 3;
const subscriptionsSubject = new Subject();
return merge(this.pageParamsChanged$, this.routerParamsChanged$)
.pipe(
mergeMap((newParams) => {
// First merge map subscribes to data and un subscribes when second merge map unsubscribes
subscriptionsSubject.next();
return Observable.create((observer: Subscriber<any>) => {
let subscription = Meteor.subscribe('mu/best/data/sub', newParams,
() => {
observer.next(subscription);
observer.complete();
});
});
}),
mergeMap((subscription: any) => {
// second subscription is just populating the data
return Observable.create((observer: Subscriber<Meteor.Error | any>) => {
const collection = new Mongo.Collection(subscription.collectionName);
const { selector, options } = this.mongoParams();
collection.find(selector, options).dataChanges((data) => {
observer.next({ data });
});
return () => {
subscription.stop();
};
}).pipe(
takeUntil(subscriptionsSubject
.pipe(
take(N),
filter((_, idx) => idx === N - 1)
)
)
);
})
);
}
解决方案
在不考虑您的代码段的情况下,这就是我的处理方式:
不是并发的前 N 个订阅:数量,而是最近的 N 个,就像一个滑动窗口
如果我理解正确,你会想要这样的东西(假设N = 3
):
N = 3
Crt | 1 | 2 | 3 |
Subscriptions | S1 | S2 | S3 |
When Crt = 4
Crt | 2 | 3 | 4 |
Subscriptions | S2 | S3 | S4 |
如果是这种情况,这就是我的解决方法:
const subscriptionsSubject = new Subject();
src$.pipe(
mergeMap(
data => (new Observable(s => {/* ... */ subscriptionsSubject.next(null) /* Notify about a new subscription when it's the case */ }))
.pipe(
takeUntil(
subscriptionsSubject.pipe(
take(N), // After `N` subscriptions, it will complete
filter((_, idx) => idx === N - 1) // Do not want to complete immediately, but only when exactly `N` subscriptions have been reached
)
)
)
)
)
推荐阅读
- android - 在 Android 中从 AsyncTask 迁移到协程?
- c# - 与 ASP.NET Core 中的手动验证相比,自定义验证属性的优势?
- ios - 使用 iOS CallKit 播放系统铃声
- r - 运行具有确定性参数变化的 nlrx 模拟
- python - 熊猫:想要将序列号转换为完整数字
- javascript - 在选择下拉列表时从数组中删除对象
- windows-installer - 根据安装程序设置程序语言
- java - 由于性能原因聚合多个 Hibernate 公式
- c++ - 如何在 librdkafka 中正确重新发送失败的消息?
- r - 从数据框中删除特定的观察结果。这些特定的观察结果存储在 R 中的另一个数据框中