首页 > 解决方案 > 数据流写入 GCS 存储桶,但文件名中的时间戳未更改

问题描述

我有一个关于 Apache Beam 的问题,尤其是关于数据流的问题。

我有一个从 cloudsql 数据库读取并写入 GCS 的管道。文件名中有一个时间戳。我希望每次运行它时,它都会生成一个带有不同时间戳的文件。

我在本地机器上进行了测试。Beam 从 postgres 数据库读取并写入文件(而不是 gcs)。它工作正常。生成的文件中有不同的时间戳。喜欢

jdbc_output.csv-00000-of-00001_2020-08-19_00:11:17.csv

jdbc_output.csv-00000-of-00001_2020-08-19_00:25:07.csv

但是,当我部署到 Dataflow 时,通过 Airflow 触发它(我们将气流作为调度程序),它生成的文件名始终使用相同的时间戳。即使我多次运行时间戳也不会改变。时间戳与上传数据流模板的时间非常接近。

这是要编写的简单代码。

output.apply("Write to Bucket", TextIO.write().to("gs://my-bucket/filename").withNumShards(1)
      .withSuffix("_" +  String.valueOf(new Timestamp(new Date().getTime())).replace(" ","_") +".csv"));

我想知道为什么数据流不使用文件名中的当前时间,而是使用上传模板文件时的时间戳。

此外,如何解决这个问题?我的计划是每天运行数据流,并期待一个具有不同时间戳的新文件。

标签: google-cloud-platformairflowapache-beamdataflowbucket

解决方案


我的直觉(因为我从未测试过它)是模板创建会启动您的管道并对其进行快照。因此,您的管道已运行,您的日期时间被评估并在模板中保持原样。并且价值永远不会改变。

文档描述还提到管道在模板创建之前运行,就像编译一样。

  1. 开发人员运行管道并创建模板。Apache Beam SDK 在 Cloud Storage 中暂存文件,创建模板文件(类似于作业请求),并将模板文件保存在 Cloud Storage 中。

要解决此问题,您可以使用ValueProvider界面。而且,我之前从未创建过链接,但它在文档的模板部分

注意:但是,对于 Cloud SQL 数据库中的简单读取和导出到文件,最便宜且最容易维护的方法是不使用 Dataflow!


推荐阅读