首页 > 解决方案 > Beam/Dataflow ReadAllFromParquet 没有读取任何内容,但我的工作仍然成功?

问题描述

我有一个数据流工作:

  1. 从 GCS 读取包含其他文件名的文本文件
  2. 将文件名传递给 ReadAllFromParquet 以读取 .parquet 文件
  3. 写入 BigQuery

尽管我的工作“成功”,但它基本上没有超过 ReadAllFromParquet 步骤的输出集合。我成功地读取了列表中的文件,例如:['gs://my_bucket/my_file1.snappy.parquet','gs://my_bucket/my_file2.snappy.parquet','gs://my_bucket/my_file3.snappy.parquet'] 我还确认此列表是正确的,并且文件的 GCS 路径是正确的,在 ReadAllFromParquet 之前的步骤中使用记录器。

这就是我的管道的样子(为简洁起见省略了完整的代码,但我相信它通常可以正常工作,因为我使用 ReadAllFromText 为 .csv 提供了完全相同的管道,并且工作正常):

 with beam.Pipeline(options=pipeline_options_batch) as pipeline_2:

    try:

        final_data = (
        pipeline_2
        |'Create empty PCollection' >> beam.Create([None])
        |'Get accepted batch file: {}'.format(runtime_options.complete_batch) >> beam.ParDo(OutputValueProviderFn(runtime_options.complete_batch))
        |'Read all filenames into a list'>> beam.ParDo(FileIterator(runtime_options.files_bucket))
        |'Read all files' >> beam.io.ReadAllFromParquet(columns=['locationItemId','deviceId','timestamp'])
        |'Process all files' >> beam.ParDo(ProcessSch2())
        |'Transform to rows' >> beam.ParDo(BlisDictSch2())
        |'Write to BigQuery' >> beam.io.WriteToBigQuery(
                table = runtime_options.comp_table, 
                schema = SCHEMA_2,
                project = pipeline_options_batch.view_as(GoogleCloudOptions).project, #options.display_data()['project'],
                create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,  #'CREATE_IF_NEEDED',#create if does not exist.
                write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND    #'WRITE_APPEND' #add to existing rows,partitoning
                )
        )
    except Exception as exception:
        logging.error(exception)
        pass

这就是我的工作图之后的样子:

在此处输入图像描述

有人知道这里可能出了什么问题以及最好的调试方法是什么?我目前的想法:

  1. 存储桶权限问题。我注意到我正在读取的存储桶很奇怪,因为尽管我是项目所有者,但我无法下载文件。项目的所有者只有“Storage Legacy Bucket Owner”。我添加了“存储管理员”,然后在使用我自己的帐户手动下载文件时工作正常。根据数据流文档,我已确保默认计算服务帐户和数据流帐户在此存储桶上都具有“存储管理员”。但是,也许这只是一个红鲱鱼,因为最终如果存在权限问题我应该在日志中看到这个并且工作会失败?

  2. ReadAllFromParquet 需要不同格式的文件模式吗?我已经展示了一个列表示例(在我上面的图表中,我可以看到输入集合正确地显示了添加的元素 = 48 用于列表中的 48 个文件)我在上面提供。我知道这种格式适用于 ReadAllFromText,所以我认为它们是等效的并且应该可以工作。

=========

编辑:注意到其他潜在的后果。与我使用 ReadAllFromText 并且工作正常的其他工作相比,我注意到命名略有不匹配,这令人担忧。

这是我的工作的输出集合的名称: 在此处输入图像描述

这就是我的镶木地板工作上的名字,实际上并没有读到任何东西: 在此处输入图像描述

特别注意

Read all files/ReadAllFiles/ReadRange.out0

对比

Read all files/Read all files/ReadRange.out0

路径的第一部分是我在这两个工作中的步骤名称。但我相信第二个是来自 apache_beam.io.filebasedsource ( https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filebasedsource.py ) 的 ReadAllFiles 类,其中 ReadAllFromText 和 ReadAllFromParquet称呼。

似乎是一个潜在的错误,但似乎无法在源代码中跟踪它。

============= 编辑2

经过更多的挖掘之后,ReadAllFromParquet 似乎还没有起作用。ReadFromParquet 调用 apache_beam.io.parquetio._ParquetSource,而 ReadAllFromParquet 只调用
apache_beam.io.filebasedsource._ReadRange。

我想知道如果它是一个实验功能,是否有办法打开它?

标签: google-cloud-platformgoogle-cloud-dataflowapache-beamgoogle-cloud-iam

解决方案


如果您使用的是最后一个 Beam SDK,您没有提到,请尝试使用 SDK 2.16 来测试最后的更改。

该文档指出ReadAllFromParquet和 ReadFromParquet 一样是一个实验性功能;尽管如此,据报道ReadFromParquet在此线程Apache-Beam: Read Parquet files from nested HDFS directory中工作,您可能想尝试使用此功能。


推荐阅读