首页 > 解决方案 > 如何在 RX Java 中处理嵌套事件

问题描述

我对 RX java 很陌生,我面临一个问题。在我有一组事件的地方,一个事件的输出必须作为另一个事件的输入,并且可能有 n 个事件。我试图通过以下方式做到这一点。

    public Single<List<EventMetaData>> executeAction(Long blogId){

    Single<BlogPostData> dbblogPostData = manager.findById(blogId);


    Single<BlogPostData> chain = dbblogPostData;
    List<Single<BlogPostData>> allBlogPost = new ArrayList<>();
    allSnapshots.add(dbblogPostData);


    for (EventEntry eventEntry : Events) {
        // create a chain where response of one event is passed to next event


        chain = chain.flatMap(blogPostData -> {
            Single<BlogPostData> newSnapshot = executeEvent(eventEntry, blogPostData);

            return newSnapshot;
        });


        allBlogPost.add(chain);

    }

    return  Single.zip(allBlogPost, objects -> {
                List<blogPostData> snapshotEntries = new ArrayList<>();
                for (Object obj : objects) {
                    snapshotEntries.add((blogPostData) obj);
                }
                return getMetaData(snapshotEntries);
            });
}


 private Single<BlogPostData> executeEvent(Event event, BlogPostData blogpostEntry) throws ManagerException {



    Single<BlogPostData> clientSnapshotSingle =
            facade.getSnpashotFromClient(blogpostEntry, event);


    return clientSnapshotSingle.flatMap(
            blogPost -> {

               // Do some logic here on get new Snapshot,  that needs to be save it to DB


                blogpostEntry = utils.getDBUpdatableData(blogpostEntry, event.getRefId);
                if (event.isDbupdatable) {

                     // This methods returns Single<BlogPostData>
                    return manager.create(blogpostEntry);
                }
                return Single.just(blogpostEntry);
            });

}

现在如果我有两个事件,比如说 e1,e2。这段代码

chain.flatMap(blogPostData -> { Single<BlogPostData> newSnapshot = executeEvent(eventEntry, blogPostData);
      return newSnapshot;}

正在执行 3 次(事件 e1 两次,e2 一次)。当我这样订阅它时。对于 4 个事件,它将总共执行 6 次。

 Single<List<EventMetaData>> listOfMeta;
     (listOfMeta.subscribe()); 

为了简化我的例子。当调用 executeAction 时,我从 DB 中获取博客条目,然后在其上运行一组事件(通过 executeEvent() 调用)。现在由于所有事件都是连续的,我需要将第一个事件的输出传递给第二个事件的输入,依此类推。我还将事件的所有输出存储在 ListList<Single> allBlogPost中以对其进行一些计算Single.zip(allBlogPost)

所以有人可以帮我弄清楚我的代码有什么问题,以及我怎样才能准确地执行所有事件(如果我应该使用任何其他运算符来执行此操作)n 次并将一个事件的结果传递给另一个事件。

标签: javaspringrx-javareactive-programming

解决方案


推荐阅读