google-cloud-dataflow - 并行运行相同的 DF 模板会产生奇怪的结果
问题描述
我有一个数据流作业,它从 Cloud SQL 中提取数据并将其加载到 Cloud Storage 中。我们已将作业配置为接受参数,因此我们可以使用相同的代码来提取多个表。数据流作业被编译为模板。
当我们串行创建/运行模板实例时,我们会得到我们期望的结果。但是,如果我们并行创建/运行实例,则只有少数文件会出现在 Cloud Storage 上。在这两种情况下,我们都可以看到 DF 作业已成功创建并终止。
例如,我们有 11 个实例,它们产生 11 个输出文件。串行我们得到所有 11 个文件,并行我们只得到大约 3 个文件。在并行运行期间,所有 11 个实例同时运行
任何人都可以就为什么会发生这种情况提供一些建议吗?我假设由 DF 模板创建的临时文件在并行运行期间以某种方式被覆盖?
并行运行的主要动机是更快地提取数据。
编辑
管道非常简单:
PCollection<String> results = p
.apply("Read from Cloud SQL", JdbcIO.<String>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
.create(dsDriver, dsConnection)
.withUsername(options.getCloudSqlUsername())
.withPassword(options.getCloudSqlPassword())
)
.withQuery(options.getCloudSqlExtractSql())
.withRowMapper(new JdbcIO.RowMapper<String>() {
@Override
public String mapRow(ResultSet resultSet) throws Exception {
return mapRowToJson(resultSet);
}
})
.withCoder(StringUtf8Coder.of()));
当我编译模板时
mvn compile exec:java \
-Dexec.mainClass=com.xxxx.batch_ingestion.LoadCloudSql \
-Dexec.args="--project=myproject \
--region=europe-west1 \
--stagingLocation=gs://bucket/dataflow/staging/ \
--cloudStorageLocation=gs://bucket/data/ \
--cloudSqlInstanceId=yyyy \
--cloudSqlSchema=dev \
--runner=DataflowRunner \
--templateLocation=gs://bucket/dataflow/template/BatchIngestion"
当我调用模板时,我还提供了“tempLocation”。我可以看到正在使用动态临时位置。尽管如此,我在并行运行时并没有看到所有的输出文件。
谢谢
解决方案
解决方案
- 添加唯一的 tempLocation
- 添加唯一的输出路径和文件名
- DF 完成处理后,将输出文件移动到 CS 上的最终目的地
推荐阅读
- typo3 - 在 TYPO3 8.7.x 中从 FE 用户获取数据到自己的 EXT
- azure - Linux 上的 Azure Web 应用程序:“错误:容器没有响应端口:8080 上的 HTTP ping”-使用时:“start”:“pm2 start server.js”
- matlab - 双浮点数到二进制转换
- python - 在 django 模型中为标签字段编写验证器
- c++ - 如何打印作为参数传递的函数名称?
- html - 2 个不同的动画不能在 1 个组件上一起工作
- javascript - 需要从键值搜索中删除两个引用
- python - 如何将遮罩应用于张量并保持其原始形状
- java - 无法使用 java 在 db2 中为 SEQ 插入 Nextval
- amazon-web-services - AWS API Gateway Websocket UnknownError