google-cloud-dataflow - 增加工人会导致 Dataflow 作业挂在 TextIO.Write 上 - 使用 DirectRunner 快速执行 - Apache Beam
问题描述
该程序从文件中提取记录,解析记录并将其保存到数据库中,并将故障记录写入 Cloud Storage 存储桶。我使用的测试文件仅创建 3 个失败记录 - 在本地运行时,最后一步parseResults.get(failedRecords).apply("WriteFailedRecordsToGCS", TextIO.write().to(failureRecordsPath));
以毫秒为单位执行。
在 Dataflow 中,我正在使用 5 名工人运行该流程。即使在成功写入 3 条失败记录后,该进程也会无限期地挂起在写入步骤。我可以看到它挂在台阶上WriteFailedRecordsToGCS/WriteFiles/FinalizeTempFileBundles/Reshuffle.ViaRandomKey/Pair with random key.out0
谁能告诉我为什么 DirectRunner 和 Dataflow 之间的行为如此不同?整个管道如下。
StageUtilizationDataSourceOptions options = PipelineOptionsFactory.fromArgs(args).as(StageUtilizationDataSourceOptions.class);
final TupleTag<Utilization> parsedRecords = new TupleTag<Utilization>("parsedRecords") {};
final TupleTag<String> failedRecords = new TupleTag<String>("failedRecords") {};
DrgAnalysisDbStage drgAnalysisDbStage = new DrgAnalysisDbStage(options);
HashMap<String, Client> clientKeyMap = drgAnalysisDbStage.getClientKeys();
Pipeline pipeline = Pipeline.create(options);
PCollectionTuple parseResults = PCollectionTuple.empty(pipeline);
PCollection<String> records = pipeline.apply("ReadFromGCS", TextIO.read().from(options.getGcsFilePath()));
if (FileTypes.utilization.equalsIgnoreCase(options.getFileType())) {
parseResults = records
.apply("ConvertToUtilizationRecord", ParDo.of(new ParseUtilizationFile(parsedRecords, failedRecords, clientKeyMap, options.getGcsFilePath()))
.withOutputTags(parsedRecords, TupleTagList.of(failedRecords)));
parseResults.get(parsedRecords).apply("WriteToUtilizationStagingTable", drgAnalysisDbStage.writeUtilizationRecordsToStagingTable());
} else {
logger.error("Unrecognized file type provided: " + options.getFileType());
}
String failureRecordsPath = Utilities.getFailureRecordsPath(options.getGcsFilePath(), options.getFileType());
parseResults.get(failedRecords).apply("WriteFailedRecordsToGCS", TextIO.write().to(failureRecordsPath));
pipeline.run().waitUntilFinish();