google-cloud-platform - Beam/Dataflow ReadAllFromParquet 没有读取任何内容,但我的工作仍然成功?
问题描述
我有一个数据流工作:
- 从 GCS 读取包含其他文件名的文本文件
- 将文件名传递给 ReadAllFromParquet 以读取 .parquet 文件
- 写入 BigQuery
尽管我的工作“成功”,但它基本上没有超过 ReadAllFromParquet 步骤的输出集合。我成功地读取了列表中的文件,例如:['gs://my_bucket/my_file1.snappy.parquet','gs://my_bucket/my_file2.snappy.parquet','gs://my_bucket/my_file3.snappy.parquet']
我还确认此列表是正确的,并且文件的 GCS 路径是正确的,在 ReadAllFromParquet 之前的步骤中使用记录器。
这就是我的管道的样子(为简洁起见省略了完整的代码,但我相信它通常可以正常工作,因为我使用 ReadAllFromText 为 .csv 提供了完全相同的管道,并且工作正常):
with beam.Pipeline(options=pipeline_options_batch) as pipeline_2:
try:
final_data = (
pipeline_2
|'Create empty PCollection' >> beam.Create([None])
|'Get accepted batch file: {}'.format(runtime_options.complete_batch) >> beam.ParDo(OutputValueProviderFn(runtime_options.complete_batch))
|'Read all filenames into a list'>> beam.ParDo(FileIterator(runtime_options.files_bucket))
|'Read all files' >> beam.io.ReadAllFromParquet(columns=['locationItemId','deviceId','timestamp'])
|'Process all files' >> beam.ParDo(ProcessSch2())
|'Transform to rows' >> beam.ParDo(BlisDictSch2())
|'Write to BigQuery' >> beam.io.WriteToBigQuery(
table = runtime_options.comp_table,
schema = SCHEMA_2,
project = pipeline_options_batch.view_as(GoogleCloudOptions).project, #options.display_data()['project'],
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED, #'CREATE_IF_NEEDED',#create if does not exist.
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND #'WRITE_APPEND' #add to existing rows,partitoning
)
)
except Exception as exception:
logging.error(exception)
pass
这就是我的工作图之后的样子:
有人知道这里可能出了什么问题以及最好的调试方法是什么?我目前的想法:
存储桶权限问题。我注意到我正在读取的存储桶很奇怪,因为尽管我是项目所有者,但我无法下载文件。项目的所有者只有“Storage Legacy Bucket Owner”。我添加了“存储管理员”,然后在使用我自己的帐户手动下载文件时工作正常。根据数据流文档,我已确保默认计算服务帐户和数据流帐户在此存储桶上都具有“存储管理员”。但是,也许这只是一个红鲱鱼,因为最终如果存在权限问题我应该在日志中看到这个并且工作会失败?
ReadAllFromParquet 需要不同格式的文件模式吗?我已经展示了一个列表示例(在我上面的图表中,我可以看到输入集合正确地显示了添加的元素 = 48 用于列表中的 48 个文件)我在上面提供。我知道这种格式适用于 ReadAllFromText,所以我认为它们是等效的并且应该可以工作。
=========
编辑:注意到其他潜在的后果。与我使用 ReadAllFromText 并且工作正常的其他工作相比,我注意到命名略有不匹配,这令人担忧。
特别注意
Read all files/ReadAllFiles/ReadRange.out0
对比
Read all files/Read all files/ReadRange.out0
路径的第一部分是我在这两个工作中的步骤名称。但我相信第二个是来自 apache_beam.io.filebasedsource ( https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filebasedsource.py ) 的 ReadAllFiles 类,其中 ReadAllFromText 和 ReadAllFromParquet称呼。
似乎是一个潜在的错误,但似乎无法在源代码中跟踪它。
============= 编辑2
经过更多的挖掘之后,ReadAllFromParquet 似乎还没有起作用。ReadFromParquet 调用 apache_beam.io.parquetio._ParquetSource,而 ReadAllFromParquet 只调用
apache_beam.io.filebasedsource._ReadRange。
我想知道如果它是一个实验功能,是否有办法打开它?
解决方案
如果您使用的是最后一个 Beam SDK,您没有提到,请尝试使用 SDK 2.16 来测试最后的更改。
该文档指出ReadAllFromParquet和 ReadFromParquet 一样是一个实验性功能;尽管如此,据报道ReadFromParquet在此线程Apache-Beam: Read Parquet files from nested HDFS directory中工作,您可能想尝试使用此功能。
推荐阅读
- file - 为什么 readInt() 在 readInt> 266 时给我一个不正确的值?
- variables - 在 Dreamhost 上存储秘密变量和密钥
- reactjs - 无法使用 ReactJs、Axios、Redux 渲染/显示从 api 获取的数据
- javascript - 使用 RSA 私钥解密 Web 应用程序中的数据
- python - 为什么 PyTorch 优化器可能无法更新其参数?
- flutter - 带有身份验证和 Web 支持的颤动导航 - 定义主路由
- swift - 在 Swift 中将文本打印到崩溃文件
- keyboard - azure cloud shell 上的奇怪键盘响应
- .net - pythonnet安装失败
- php - Symfony“无效凭据”