首页 > 解决方案 > 将循环中的处理转换为将许多列表更新为 CompletableFuture

问题描述

我有一个字符串列表,对于列表中的每个项目(基本上是一个文件名),我必须进行一些处理。我有顺序过程工作,但我想将其转换为 CompleatableFuture,我必须在其中更新许多列表。我该怎么办?

这是我的原始代码

Path fileLocation = findLocationForFiles(fileDTSuffix);
Map<String, Map<String, String>> csvMap = getCsvToMap("filename", fileLocation);
List<String> filesMissingCSV = new ArrayList<>();
List<String> expiredAssets = new ArrayList<>();

int filesUploaded = 0;
int filesProcessed = 0;
//TODO : Convert below to use CompletableFuture to perform the actions in thread

for (String fileName : filesToProcessList) {
    if(filesProcessed > 100){
        System.out.println("Processed 100 files - calling cleanupTempDir ..... ");
        cleanupTempDir(fileLocation);
        filesProcessed = 0;
    }
    Path path = Paths.get(fileName);

    File file = new File(fileName);
    String baseFileName = file.getName();
    String translatedNameFromVW = translateFilenameFromVW(baseFileName);
    String translatedNameFromVWSansConfig = translateFilenameFromVWSansConfig(baseFileName);
    String translatedNameToVW = translateFilenameToVW(baseFileName);
    boolean isFilenameEqFromVW = baseFileName.equalsIgnoreCase(translatedNameFromVW);
    boolean isFilenameEqToVW = baseFileName.equalsIgnoreCase(translatedNameToVW);
    Map<String, String> csvFileInfo = getCSVMapAttributes(csvMap, baseFileName, translatedNameFromVW, translatedNameFromVWSansConfig, translatedNameToVW);

    if (null == csvFileInfo) {
        filesMissingCSV.add(fileName + " | " + file.getName());
    } else {
        if (!isAssetExpired(csvFileInfo)) {
            AssetMaster am = generateJSON(fileName, csvFileInfo);
            addToCassandra(path, am, translatedNameToVW);
            printJson(am, fileLocation);
            filesUploaded++;
            filesProcessed++;
        } else {
            expiredAssets.add(fileName + " | " + file.getName());
        }
    }
}

我正在尝试将其转换为使用 CompletableFutures,如下所示

List<CompletableFuture> asyncThreadsList = new ArrayList();

for (String fileName : filesToProcessList) {
    asyncThreadsList.add(CompletableFuture.runAsync(() -> {
        try {
            processEachFile(csvMap, fileName, fileLocation);
        } catch (IOException ex) {
            Logger.getLogger(UploadThreaded.class.getName()).log(Level.SEVERE, null, ex);
        }
    }));
}
CompletableFuture.allOf(asyncThreadsList.toArray(new CompletableFuture[0]))
   .thenRunAsync(() -> System.out.println("Ended doing things"));

我的定义processEachFile(csvMap, fileName, fileLocation)如下

private static CompletableFuture<Void> processEachFile(Map<String, Map<String, String>> csvMap, String fileName, Path fileLocation) throws IOException {

    if (filesProcessed > 100) {
        System.out.println("Processed 100 files - calling cleanupTempDir ..... ");
        cleanupTempDir(fileLocation);
        filesProcessed = 0;
    }
    Path path = Paths.get(fileName);

    File file = new File(fileName);
    String baseFileName = file.getName();
    String translatedNameFromVW = translateFilenameFromVW(baseFileName);
    String translatedNameFromVWSansConfig = translateFilenameFromVWSansConfig(baseFileName);
    String translatedNameToVW = translateFilenameToVW(baseFileName);
    boolean isFilenameEqFromVW = baseFileName.equalsIgnoreCase(translatedNameFromVW);
    boolean isFilenameEqToVW = baseFileName.equalsIgnoreCase(translatedNameToVW);
    Map<String, String> csvFileInfo = getCSVMapAttributes(csvMap, baseFileName, translatedNameFromVW, translatedNameFromVWSansConfig, translatedNameToVW);

    if (null == csvFileInfo) {
        filesMissingCSV.add(fileName + " | " + file.getName());
    } else {
        if (!isAssetExpired(csvFileInfo)) {
            AssetMaster am = generateJSON(fileName, csvFileInfo);
            addToCassandra(path, am, translatedNameToVW);
            printJson(am, fileLocation);
            filesUploaded++;
            filesProcessed++;
        } else {
            expiredAssets.add(fileName + " | " + file.getName());
        }
    }
    return null;
}

我的问题是我需要从每次运行中合并这些列表 : filesMissingCSVexpiredAssets并更新计数filesUploadedfilesProcessed从每次运行中获取最终值。我很难弄清楚应该如何实现这一目标。

是否有可能做到这一点,应该如何实现?

谢谢

标签: java-8completable-future

解决方案


@adbdkb评论说我实际上有一个用于目录的外部循环,这个循环用于每个目录中的文件。使用您在此处显示的相同机制,我是否也可以为外部循环创建 CompletableFutures 列表?

所以我决定提供一个简单的解决方案,只涵盖主要的 OP 问题。

首先,我创建了一个类来处理每个文件的结果。

class ProcessDetail {
  private int fileUploaded;
  private int fileProcessed;
  private List<String> expireFile = new ArrayList<>();
  private List<String> missingFile = new ArrayList<>();
}

还有一些方法可以简化任务。第一种方法是processEachDir。我只是传递一个字符串列表作为代表目录文件的参数。此方法返回List<CompletableFuture<ProcessDetail>>.

private static List<CompletableFuture<ProcessDetail>> processEachDir(List<String> dir) {
    System.out.println(dir + " time: " + LocalDateTime.now());
    sleep(1);
    return dir.stream()
            .map(fileName -> processEachFile(fileName))
            .collect(Collectors.toList());
} 

第二种方法是processEachFile获取一个代表文件名的字符串参数。这个方法返回CompletableFuture<ProcessDetail>

private static CompletableFuture<ProcessDetail> processEachFile(String fileName) {
    System.out.println(fileName + " time: " + LocalDateTime.now());

    return CompletableFuture.supplyAsync(() -> {
        ProcessDetail processDetail = new ProcessDetail();
        if (csvFileInfo()) {
            processDetail.getMissingFile().add(fileName);
        } else if (!isAssetExpired()) {
            processDetail.setFileProcessed(1);
            processDetail.setFileUploaded(1);
        } else {
            processDetail.getExpireFile().add(fileName);
        }
        return processDetail;
    });
}

现在是时候完成任务了:为每个目录下面的代码创建CompletableFuture<ProcessDetail>和处理文件和其他东西的列表。并将其结果放入以目录名称为键的映射中。processDetails.put(dir.getKey(), result);

directories.entrySet().forEach(dir -> {
        List<CompletableFuture<ProcessDetail>> list = processEachDir(dir.getValue());
        CompletableFuture.allOf(list.toArray(new CompletableFuture<?>[0]))
                .thenAccept(nothing -> {
                    ProcessDetail result = list.stream()
                            .map(CompletableFuture::join).reduce(new ProcessDetail(), merge);
                    processDetails.put(dir.getKey(), result);

                });
});

演示


推荐阅读