首页 > 解决方案 > foreach循环中的completablefuture runasync

问题描述

我在 foreach 循环中定义了 completablefuture runasync() 任务。

我需要在 nosql db 中插入记录,并且需要更新与 sql db 中的“已处理”相同的插入记录(将数据从 sqldb DB2 迁移到 nosql mongodb)。

为了实现这一点,我在 runasyn() 中定义了 mongo 插入过程,并在 thenAccept() 中定义了更新 db2 中处理的数据的函数(检查代码片段)

所以问题是在 mongo 中的每个记录插入之后,我将插入的记录保留在列表中,并尝试一次性更新 db2 中的整个列表,但它的行为并非如此,对于 mongo 中的每次插入,每次记录更新都会命中 db2 但是这个在处理数千条记录时是不可行的方法。我的期望是首先获得 mongo 插入记录的列表,然后一次性将这些记录列表更新为 db2 中的“已处理”。这种方法有可能吗?(我知道在 foreach 循环中定义了 mongo 插入和 db2 更新,但我希望 mongo 应该完成所有条目的插入,然后需要一次性更新整个 mongo 插入列表的 db2)或者我是否能够从 mongoProcess 至少返回插入记录的列表.

代码片段:

unprocessedList.foreach(entry-> {
Completablefuture <Void> cf= Completablefuture.runAsync(() -> {
mongoHelper.processInMongo(entry, getObj(entry)) } , executor). thenAccept ( 
updateInDb2 ( entryList)) });

标签: multithreadingconcurrencyjava-8completable-future

解决方案


如果我对您的理解正确,您想完成所有插入,然后才进行一次更新。您可以更改您的代码以使用 Promises(我自己对 Java 不太熟悉)所以这里有一个针对您的问题的“基本”解决方案。

请注意,您应该将atomicInterger用于计数器变量,因为 ++ 不是原子的,否则将无法 100% 工作

function x (count, expected) {
 if(count == expected) {
    updateInDb2 ( entryList)) 
   }
}

counter = 0:

unprocessedList.foreach(entry-> {
    Completablefuture <Void> cf= Completablefuture.runAsync(() -> {
    mongoHelper.processInMongo(entry, getObj(entry)) } , executor).thenAccept(
   counter++;
   x(counter, unprocessedList.length);
 });

正如我所说,我对 Java 承诺不太熟悉,但更好的解决方案是:

await promise = 插入所有 mongo 文档,promise 已完成,然后更新列表


推荐阅读