首页 > 解决方案 > 在 apache 光束管道中使用 MatchFiles() 来获取文件名并在 python 中解析 json

问题描述

我在存储桶中有很多 json 文件,使用 python 3 我想获取文件名,然后创建文件的键值对并读取它们。我相信匹配文件现在适用于 python,但我想知道如何实现它:

files = p | fileio.MatchFiles("gs://mybuckenumerate/*.json") 
    | #Ideally want to create a tuple of filename, json row which I will pass into a ParDo that is a custom class that parses the json

目标是假设我在一个存储桶中有 10 个文件:

gs://mybucket/myfile1.json
gs://mybucket/myfile2.json

并且bucket中的json文件都具有相同的结构

我将它传递给自定义 ParseFile 类(我认为通过 ParDo,我的 apache 束知识是有限的)并且对于 json 中的每一行,我输出一个字典(我将保存到换行符分隔的 json)其中一个键是文件名。

编辑 9/24 上午 11:15 太平洋标准时间:这是我尝试过的

file_content_pairs = (p 
                | fileio.MatchFiles(known_args.input_bucket)
                | fileio.ReadMatches()
                | beam.Map(lambda file: (file.metadata.path, json.loads(file.read_utf8())))
                | beam.ParDo(TestThis())
                )

TestThis() 只是应该打印内容:

class TestThis(beam.DoFn):

    def process(self, element):
        print(element)
        print("stop")
        yield element

但我在输出中看到的只是:INFO:root:Finished 在 1.2762866020202637 秒内列出 2 个文件。

标签: pythongoogle-cloud-dataflowapache-beamapache-beam-io

解决方案


我不明白。你想拥有 的键值对(filename, json-parsed-contents)吗?

如果是这样,你会:

file_content_pairs = (
  p | fileio.MatchFiles("gs://mybucketname/*.json")
    | fileio.ReadMatches()
    | beam.Map(lambda file: (file.metadata.path, json.loads(file.read_utf8()))
)

因此,如果您的文件如下所示:

==============myfile.json===============
{"a": "b",
 "c": "d",
 "e": 1}

然后,您的file_content_pairs集合将包含键值对("myfile.json", {"a":"b", "c": "d", "e": 1})


如果您的文件是 json 行格式,您可以:

def consume_file(f):
  other_name = query_bigquery(f.metadata.path)
  return [(other_name, json.loads(line))
          for line in f.read_utf8().strip().split('\n')]

with Pipeline() as p:
  result = (p
            | fileio.MatchFiles("gs://mybucketname/*.json")
            | fileio.ReadMatches()
            | beam.FlatMap(consume_file))

推荐阅读