java-8 - 将循环中的处理转换为将许多列表更新为 CompletableFuture
问题描述
我有一个字符串列表,对于列表中的每个项目(基本上是一个文件名),我必须进行一些处理。我有顺序过程工作,但我想将其转换为 CompleatableFuture,我必须在其中更新许多列表。我该怎么办?
这是我的原始代码
Path fileLocation = findLocationForFiles(fileDTSuffix);
Map<String, Map<String, String>> csvMap = getCsvToMap("filename", fileLocation);
List<String> filesMissingCSV = new ArrayList<>();
List<String> expiredAssets = new ArrayList<>();
int filesUploaded = 0;
int filesProcessed = 0;
//TODO : Convert below to use CompletableFuture to perform the actions in thread
for (String fileName : filesToProcessList) {
if(filesProcessed > 100){
System.out.println("Processed 100 files - calling cleanupTempDir ..... ");
cleanupTempDir(fileLocation);
filesProcessed = 0;
}
Path path = Paths.get(fileName);
File file = new File(fileName);
String baseFileName = file.getName();
String translatedNameFromVW = translateFilenameFromVW(baseFileName);
String translatedNameFromVWSansConfig = translateFilenameFromVWSansConfig(baseFileName);
String translatedNameToVW = translateFilenameToVW(baseFileName);
boolean isFilenameEqFromVW = baseFileName.equalsIgnoreCase(translatedNameFromVW);
boolean isFilenameEqToVW = baseFileName.equalsIgnoreCase(translatedNameToVW);
Map<String, String> csvFileInfo = getCSVMapAttributes(csvMap, baseFileName, translatedNameFromVW, translatedNameFromVWSansConfig, translatedNameToVW);
if (null == csvFileInfo) {
filesMissingCSV.add(fileName + " | " + file.getName());
} else {
if (!isAssetExpired(csvFileInfo)) {
AssetMaster am = generateJSON(fileName, csvFileInfo);
addToCassandra(path, am, translatedNameToVW);
printJson(am, fileLocation);
filesUploaded++;
filesProcessed++;
} else {
expiredAssets.add(fileName + " | " + file.getName());
}
}
}
我正在尝试将其转换为使用 CompletableFutures,如下所示
List<CompletableFuture> asyncThreadsList = new ArrayList();
for (String fileName : filesToProcessList) {
asyncThreadsList.add(CompletableFuture.runAsync(() -> {
try {
processEachFile(csvMap, fileName, fileLocation);
} catch (IOException ex) {
Logger.getLogger(UploadThreaded.class.getName()).log(Level.SEVERE, null, ex);
}
}));
}
CompletableFuture.allOf(asyncThreadsList.toArray(new CompletableFuture[0]))
.thenRunAsync(() -> System.out.println("Ended doing things"));
我的定义processEachFile(csvMap, fileName, fileLocation)
如下
private static CompletableFuture<Void> processEachFile(Map<String, Map<String, String>> csvMap, String fileName, Path fileLocation) throws IOException {
if (filesProcessed > 100) {
System.out.println("Processed 100 files - calling cleanupTempDir ..... ");
cleanupTempDir(fileLocation);
filesProcessed = 0;
}
Path path = Paths.get(fileName);
File file = new File(fileName);
String baseFileName = file.getName();
String translatedNameFromVW = translateFilenameFromVW(baseFileName);
String translatedNameFromVWSansConfig = translateFilenameFromVWSansConfig(baseFileName);
String translatedNameToVW = translateFilenameToVW(baseFileName);
boolean isFilenameEqFromVW = baseFileName.equalsIgnoreCase(translatedNameFromVW);
boolean isFilenameEqToVW = baseFileName.equalsIgnoreCase(translatedNameToVW);
Map<String, String> csvFileInfo = getCSVMapAttributes(csvMap, baseFileName, translatedNameFromVW, translatedNameFromVWSansConfig, translatedNameToVW);
if (null == csvFileInfo) {
filesMissingCSV.add(fileName + " | " + file.getName());
} else {
if (!isAssetExpired(csvFileInfo)) {
AssetMaster am = generateJSON(fileName, csvFileInfo);
addToCassandra(path, am, translatedNameToVW);
printJson(am, fileLocation);
filesUploaded++;
filesProcessed++;
} else {
expiredAssets.add(fileName + " | " + file.getName());
}
}
return null;
}
我的问题是我需要从每次运行中合并这些列表 : filesMissingCSV
,expiredAssets
并更新计数filesUploaded
并filesProcessed
从每次运行中获取最终值。我很难弄清楚应该如何实现这一目标。
是否有可能做到这一点,应该如何实现?
谢谢
解决方案
@adbdkb评论说我实际上有一个用于目录的外部循环,这个循环用于每个目录中的文件。使用您在此处显示的相同机制,我是否也可以为外部循环创建 CompletableFutures 列表?
所以我决定提供一个简单的解决方案,只涵盖主要的 OP 问题。
首先,我创建了一个类来处理每个文件的结果。
class ProcessDetail {
private int fileUploaded;
private int fileProcessed;
private List<String> expireFile = new ArrayList<>();
private List<String> missingFile = new ArrayList<>();
}
还有一些方法可以简化任务。第一种方法是processEachDir
。我只是传递一个字符串列表作为代表目录文件的参数。此方法返回List<CompletableFuture<ProcessDetail>>
.
private static List<CompletableFuture<ProcessDetail>> processEachDir(List<String> dir) {
System.out.println(dir + " time: " + LocalDateTime.now());
sleep(1);
return dir.stream()
.map(fileName -> processEachFile(fileName))
.collect(Collectors.toList());
}
第二种方法是processEachFile
获取一个代表文件名的字符串参数。这个方法返回CompletableFuture<ProcessDetail>
private static CompletableFuture<ProcessDetail> processEachFile(String fileName) {
System.out.println(fileName + " time: " + LocalDateTime.now());
return CompletableFuture.supplyAsync(() -> {
ProcessDetail processDetail = new ProcessDetail();
if (csvFileInfo()) {
processDetail.getMissingFile().add(fileName);
} else if (!isAssetExpired()) {
processDetail.setFileProcessed(1);
processDetail.setFileUploaded(1);
} else {
processDetail.getExpireFile().add(fileName);
}
return processDetail;
});
}
现在是时候完成任务了:为每个目录下面的代码创建CompletableFuture<ProcessDetail>
和处理文件和其他东西的列表。并将其结果放入以目录名称为键的映射中。processDetails.put(dir.getKey(), result);
directories.entrySet().forEach(dir -> {
List<CompletableFuture<ProcessDetail>> list = processEachDir(dir.getValue());
CompletableFuture.allOf(list.toArray(new CompletableFuture<?>[0]))
.thenAccept(nothing -> {
ProcessDetail result = list.stream()
.map(CompletableFuture::join).reduce(new ProcessDetail(), merge);
processDetails.put(dir.getKey(), result);
});
});
推荐阅读
- time - 这是什么时间格式?是 UNIX 吗?
- ruby-on-rails - 使用本机扩展安装 pg 1.1.3 失败
- android - android 应用程序如何跟踪,如果以前安装了应用程序?
- amazon-web-services - AWS IoT 证书突然失效
- java - 使 ArrayList 添加线程安全而不使用线程安全集合
- assembly - 非法指令:PRINTN
- android - Espresso 测试无法通过“依赖冲突”构建
- ios - Xcode 倒计时几天,直到圣诞节
- javascript - 用 html 和 javascript 编写画布
- list - 试图了解如何不将元素添加到列表中