google-cloud-dataflow - 用于包含文件名元数据的大量文件 (1M+) 的 Apache Beam/Dataflow 管道
问题描述
我一直在为 GCS 存储桶中存储 1M+ 文件的用例编写 Apache Beam/Dataflow 管道;Bigquery 行输出还需要每个文件的路径。每个输入文件都是一个单行 json 文件。
这是我当前的管道片段:
input_file_path = 'gs://' + BUCKET + '/**'
with beam.Pipeline(options=options) as p:
(p | 'Reading input file' >> beam.io.ReadFromTextWithFilename(input_file_path)
| 'Converting from json to dict' >> beam.ParDo(JSONtoDict())
| 'Write entries into Bigquery' >> beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
所以我使用ReadFromTextWithFilename
的是 Beam Python SDK。
在测试时,它按预期适用于约 1000 个 json 文件。但是,我不确定它是否适用于我计划运行此管道(通过 Google Dataflow)的大型数据集中的 >1M 文件。
在 Java SDK 中,我遇到了withHintMatchesManyFiles。我在 Python SDK 中找不到等价物,我应该切换到 Java SDK 来实现这个功能吗?
考虑到 1M+ 的输入文件,是否有另一种获取每个字符串行的文件名的方法?
解决方案
不知道你在犹豫什么。您能解释一下是什么让您认为它不适用于大量文件吗?
您可以使用 MatchFiles 来匹配和读取多个文件。
readable_files = (
pipeline
| fileio.MatchFiles('hdfs://path/to/*.txt')
| fileio.ReadMatches()
| beam.Reshuffle())
files_and_contents = (
readable_files
| beam.Map(lambda x: (x.metadata.path, x.read_utf8())))
参考:
推荐阅读
- atom-editor - 如何从原子打开特定文件?
- r - R - 来自数据集的自定义概率分布的随机绘图
- java - jboss intellij https 管理接口不支持,请禁用它并改用http管理接口
- opengl-es - 如何模拟 GL_TEXTURE_EXTERNAL_OES 纹理?
- php - Laravel,VueJS应用程序在生产中的空白页面
- python - Python 创建表并保存为 CSV 并显示第一行内容
- java - 操作字符串以创建具有相应索引的新字符串
- javascript - 如何限制可以创建元素的次数
- .net - VB.NET 临时修改循环的起始值?
- python - 如何在 SQL Server 上使用 python odbc DROP TABLE?