python - 在 apache 光束管道中使用 MatchFiles() 来获取文件名并在 python 中解析 json
问题描述
我在存储桶中有很多 json 文件,使用 python 3 我想获取文件名,然后创建文件的键值对并读取它们。我相信匹配文件现在适用于 python,但我想知道如何实现它:
files = p | fileio.MatchFiles("gs://mybuckenumerate/*.json")
| #Ideally want to create a tuple of filename, json row which I will pass into a ParDo that is a custom class that parses the json
目标是假设我在一个存储桶中有 10 个文件:
gs://mybucket/myfile1.json
gs://mybucket/myfile2.json
并且bucket中的json文件都具有相同的结构
我将它传递给自定义 ParseFile 类(我认为通过 ParDo,我的 apache 束知识是有限的)并且对于 json 中的每一行,我输出一个字典(我将保存到换行符分隔的 json)其中一个键是文件名。
编辑 9/24 上午 11:15 太平洋标准时间:这是我尝试过的
file_content_pairs = (p
| fileio.MatchFiles(known_args.input_bucket)
| fileio.ReadMatches()
| beam.Map(lambda file: (file.metadata.path, json.loads(file.read_utf8())))
| beam.ParDo(TestThis())
)
TestThis() 只是应该打印内容:
class TestThis(beam.DoFn):
def process(self, element):
print(element)
print("stop")
yield element
但我在输出中看到的只是:INFO:root:Finished 在 1.2762866020202637 秒内列出 2 个文件。
解决方案
我不明白。你想拥有 的键值对(filename, json-parsed-contents)
吗?
如果是这样,你会:
file_content_pairs = (
p | fileio.MatchFiles("gs://mybucketname/*.json")
| fileio.ReadMatches()
| beam.Map(lambda file: (file.metadata.path, json.loads(file.read_utf8()))
)
因此,如果您的文件如下所示:
==============myfile.json===============
{"a": "b",
"c": "d",
"e": 1}
然后,您的file_content_pairs
集合将包含键值对("myfile.json", {"a":"b", "c": "d", "e": 1})
。
如果您的文件是 json 行格式,您可以:
def consume_file(f):
other_name = query_bigquery(f.metadata.path)
return [(other_name, json.loads(line))
for line in f.read_utf8().strip().split('\n')]
with Pipeline() as p:
result = (p
| fileio.MatchFiles("gs://mybucketname/*.json")
| fileio.ReadMatches()
| beam.FlatMap(consume_file))
推荐阅读
- sql - 如何检查另一个表中是否存在值
- html - Cent OS 服务器上的 Nginx 抛出 404 not found
- c# - 以编程方式创建任务计划程序
- php - 使用php在日期时间添加日期和月份
- java - 在 xtext 中创建对象
- mysql - mysql上的错误说检查手册
- graphql - 将 GraphQL 类型模块化到单独的文件中
- react-router - react-router 和 react-hash-router 有什么区别?
- mysql - 在 C3P0.properties 上设置了多少个连接
- mysql - MySQL 查询将在一个月的最后一个小时内找到日期时间的所有条目