首页 > 解决方案 > 增加工人会导致 Dataflow 作业挂在 TextIO.Write 上 - 使用 DirectRunner 快速执行 - Apache Beam

问题描述

该程序从文件中提取记录,解析记录并将其保存到数据库中,并将故障记录写入 Cloud Storage 存储桶。我使用的测试文件仅创建 3 个失败记录 - 在本地运行时,最后一步parseResults.get(failedRecords).apply("WriteFailedRecordsToGCS", TextIO.write().to(failureRecordsPath));以毫秒为单位执行。

在 Dataflow 中,我正在使用 5 名工人运行该流程。即使在成功写入 3 条失败记录后,该进程也会无限期地挂起在写入步骤。我可以看到它挂在台阶上WriteFailedRecordsToGCS/WriteFiles/FinalizeTempFileBundles/Reshuffle.ViaRandomKey/Pair with random key.out0

谁能告诉我为什么 DirectRunner 和 Dataflow 之间的行为如此不同?整个管道如下。

        StageUtilizationDataSourceOptions options = PipelineOptionsFactory.fromArgs(args).as(StageUtilizationDataSourceOptions.class);
        final TupleTag<Utilization> parsedRecords = new TupleTag<Utilization>("parsedRecords") {};
        final TupleTag<String> failedRecords = new TupleTag<String>("failedRecords") {};
        DrgAnalysisDbStage drgAnalysisDbStage = new DrgAnalysisDbStage(options);
        HashMap<String, Client> clientKeyMap = drgAnalysisDbStage.getClientKeys();

        Pipeline pipeline = Pipeline.create(options);
        PCollectionTuple parseResults = PCollectionTuple.empty(pipeline);

        PCollection<String> records = pipeline.apply("ReadFromGCS", TextIO.read().from(options.getGcsFilePath()));

        if (FileTypes.utilization.equalsIgnoreCase(options.getFileType())) {
             parseResults = records
                    .apply("ConvertToUtilizationRecord", ParDo.of(new ParseUtilizationFile(parsedRecords, failedRecords, clientKeyMap, options.getGcsFilePath()))
                    .withOutputTags(parsedRecords, TupleTagList.of(failedRecords)));
             parseResults.get(parsedRecords).apply("WriteToUtilizationStagingTable", drgAnalysisDbStage.writeUtilizationRecordsToStagingTable());
        } else {
            logger.error("Unrecognized file type provided: " + options.getFileType());
        }

        String failureRecordsPath = Utilities.getFailureRecordsPath(options.getGcsFilePath(), options.getFileType());
        parseResults.get(failedRecords).apply("WriteFailedRecordsToGCS", TextIO.write().to(failureRecordsPath));

        pipeline.run().waitUntilFinish();

标签: google-cloud-dataflowapache-beamapache-beam-io

解决方案


推荐阅读