首页 > 解决方案 > 并行运行相同的 DF 模板会产生奇怪的结果

问题描述

我有一个数据流作业,它从 Cloud SQL 中提取数据并将其加载到 Cloud Storage 中。我们已将作业配置为接受参数,因此我们可以使用相同的代码来提取多个表。数据流作业被编译为模板。

当我们串行创建/运行模板实例时,我们会得到我们期望的结果。但是,如果我们并行创建/运行实例,则只有少数文件会出现在 Cloud Storage 上。在这两种情况下,我们都可以看到 DF 作业已成功创建并终止。

例如,我们有 11 个实例,它们产生 11 个输出文件。串行我们得到所有 11 个文件,并行我们只得到大约 3 个文件。在并行运行期间,所有 11 个实例同时运行

任何人都可以就为什么会发生这种情况提供一些建议吗?我假设由 DF 模板创建的临时文件在并行运行期间以某种方式被覆盖?

并行运行的主要动机是更快地提取数据。

编辑

管道非常简单:

        PCollection<String> results =  p
            .apply("Read from Cloud SQL", JdbcIO.<String>read()
                .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
                        .create(dsDriver, dsConnection)
                        .withUsername(options.getCloudSqlUsername())
                        .withPassword(options.getCloudSqlPassword())
                )
                .withQuery(options.getCloudSqlExtractSql())
                .withRowMapper(new JdbcIO.RowMapper<String>() {
                    @Override
                    public String mapRow(ResultSet resultSet) throws Exception {
                        return mapRowToJson(resultSet);
                    }
                })
                .withCoder(StringUtf8Coder.of()));

当我编译模板时

mvn compile exec:java \
 -Dexec.mainClass=com.xxxx.batch_ingestion.LoadCloudSql \
 -Dexec.args="--project=myproject \
    --region=europe-west1 \
    --stagingLocation=gs://bucket/dataflow/staging/ \
    --cloudStorageLocation=gs://bucket/data/ \
    --cloudSqlInstanceId=yyyy \
    --cloudSqlSchema=dev \
    --runner=DataflowRunner \
    --templateLocation=gs://bucket/dataflow/template/BatchIngestion"

当我调用模板时,我还提供了“tempLocation”。我可以看到正在使用动态临时位置。尽管如此,我在并行运行时并没有看到所有的输出文件。

谢谢

标签: google-cloud-dataflow

解决方案


解决方案

  1. 添加唯一的 tempLocation
  2. 添加唯一的输出路径和文件名
  3. DF 完成处理后,将输出文件移动到 CS 上的最终目的地

推荐阅读