java - 如何在 RX Java 中处理嵌套事件
问题描述
我对 RX java 很陌生,我面临一个问题。在我有一组事件的地方,一个事件的输出必须作为另一个事件的输入,并且可能有 n 个事件。我试图通过以下方式做到这一点。
public Single<List<EventMetaData>> executeAction(Long blogId){
Single<BlogPostData> dbblogPostData = manager.findById(blogId);
Single<BlogPostData> chain = dbblogPostData;
List<Single<BlogPostData>> allBlogPost = new ArrayList<>();
allSnapshots.add(dbblogPostData);
for (EventEntry eventEntry : Events) {
// create a chain where response of one event is passed to next event
chain = chain.flatMap(blogPostData -> {
Single<BlogPostData> newSnapshot = executeEvent(eventEntry, blogPostData);
return newSnapshot;
});
allBlogPost.add(chain);
}
return Single.zip(allBlogPost, objects -> {
List<blogPostData> snapshotEntries = new ArrayList<>();
for (Object obj : objects) {
snapshotEntries.add((blogPostData) obj);
}
return getMetaData(snapshotEntries);
});
}
private Single<BlogPostData> executeEvent(Event event, BlogPostData blogpostEntry) throws ManagerException {
Single<BlogPostData> clientSnapshotSingle =
facade.getSnpashotFromClient(blogpostEntry, event);
return clientSnapshotSingle.flatMap(
blogPost -> {
// Do some logic here on get new Snapshot, that needs to be save it to DB
blogpostEntry = utils.getDBUpdatableData(blogpostEntry, event.getRefId);
if (event.isDbupdatable) {
// This methods returns Single<BlogPostData>
return manager.create(blogpostEntry);
}
return Single.just(blogpostEntry);
});
}
现在如果我有两个事件,比如说 e1,e2。这段代码
chain.flatMap(blogPostData -> { Single<BlogPostData> newSnapshot = executeEvent(eventEntry, blogPostData);
return newSnapshot;}
正在执行 3 次(事件 e1 两次,e2 一次)。当我这样订阅它时。对于 4 个事件,它将总共执行 6 次。
Single<List<EventMetaData>> listOfMeta;
(listOfMeta.subscribe());
为了简化我的例子。当调用 executeAction 时,我从 DB 中获取博客条目,然后在其上运行一组事件(通过 executeEvent() 调用)。现在由于所有事件都是连续的,我需要将第一个事件的输出传递给第二个事件的输入,依此类推。我还将事件的所有输出存储在 ListList<Single> allBlogPost
中以对其进行一些计算Single.zip(allBlogPost)
。
所以有人可以帮我弄清楚我的代码有什么问题,以及我怎样才能准确地执行所有事件(如果我应该使用任何其他运算符来执行此操作)n 次并将一个事件的结果传递给另一个事件。
解决方案
推荐阅读
- encoding - 更改堆栈交换 api 请求中的编码
- java - 使用 Java 中的 JSON 密钥访问 GCS 并获取方法抛出“com.google.cloud.storage.StorageException”异常
- json - 解析 loadbalancer.yaml 时出错:将 YAML 转换为 JSON 时出错:yaml:第 4 行:找到无法启动任何令牌的字符
- arrays - MPI_GATHERV (Fortran) 从二维子矩阵创建一个新的二维矩阵
- typescript - VSCODE:打字稿。运行测试说我遇到了嵌套问题。无法弄清楚如何解决
- azure - Azure 突触 - 使用 createorreplaceglobaltempview 跨笔记本共享数据 - pyspark
- r - R:使用网格的馈送函数
- c# - 如何使用 MSI 而不是 Service Principal 来调用部署在 Azure 上的 Authenticate web api
- java - Java中抛出异常的单元测试?
- r - RMarkdown:如何将大型结果打印或分解为选项卡式内容(选项卡)