python - 如何将字典作为 PCollection 返回?
问题描述
我得到了一个谷歌云存储桶的 URL。我必须:
使用 URL 获取该存储桶中的 blob 列表
对于每个 blob,我都会进行一些 GCS API 调用以获取有关 blob 的信息(blob.size、blob.name 等)
对于每个 blob,我还必须阅读它,在其中找到一些内容并将其添加到从 GCS API 调用获得的值中
对于每个 blob,我必须将在第 2 步和第 3 步中找到的关于 blob 的值写入 BigQuery
我有数千个 blob,所以这需要使用 Apache Beam 来完成(我被推荐过)
我对管道的想法是这样的:
GetUrlOfBucket 并生成 PCollection
使用该 PCollection 获取 blob 列表作为新的 PCollection
使用这些 blob 的元数据创建一个 PCollection
执行一个转换,它将接收作为元数据值字典的 PCollection,进入 blob,扫描一个值并返回一个新的 PCollection,它是元数据值的字典和这个新值
将此写入 BigQuery。
我特别难以考虑如何将字典作为 PCollection 返回
[+] 我读到的:
https://beam.apache.org/documentation/programming-guide/#composite-transforms
https://medium.com/@rajeshhegde/data-pipeline-using-apache-beam-python-sdk-on-dataflow-6bb8550bf366
非常感谢任何建议,特别是关于如何获取该存储桶名称并返回 PCollection 的 blob。
解决方案
我通过阅读有关 apache-beam 的更多信息并弄清楚我必须使用 ParDo 函数在我的资源之间拆分作业来解决这个问题,在 ParDo 中我调用我的 DoFn 函数,该函数接受一个元素并执行它所需的所有处理并产生一个 dic。请参阅这篇文章Apache Beam:如何同时创建许多经历相同 PTransform 的 PCollection?
class ExtractMetadata(beam.DoFn):
def process(self, element):
"""
Takes in a blobName, fetches the blob and its values and returns a dictionary of values
"""
metadata = element.metadata
if metadata is not None:
event_count = int(metadata['count'])
else:
event_count = None
event_type = self.determine_event_type(element.id)
cluster = self.determine_cluster(element.id)
customer = self.determine_customer(element)
# date = datetime.strptime(element.time_created, '%a, %d %b %Y %H:%M:%S')
#date = date.isoformat()
dic = {
'blob_name' : element.name,
'event_path' : element.path,
'size' : int(element.size),
'time_of_creation': element.time_created.isoformat(),
'event_count' : event_count,
'event_type' : event_type,
'cluster' : cluster,
'customer' : customer
}
yield dic
推荐阅读
- android - Webview 中的全屏视频在 Flutter 中不起作用
- java - 当类在不同的模块中时,ClassLoader.loadClass 方法上的 ClassNotFound 异常
- perl - 在除法 {/} 中使用未初始化的值 $d
- vba - VBA Split Cells and paste only specific cells
- css - Angular 7 CSS 网格布局中的 NgStyle 无法正常工作?
- python - 如何在具有多对多关系的 django 中反向序列化
- unity3d - HoloLens 空间映射不可用
- sql - 在一定的限制下将结果压缩为 1 个结果
- office-js - Outlook Web 插件无法读取 Outlook-web-16.01.js 中 null 的属性“版本”
- ios - Flutter 问题,ios 应用无法打开且没有图标