首页 > 解决方案 > 如何将字典作为 PCollection 返回?

问题描述

我得到了一个谷歌云存储桶的 URL。我必须:

  1. 使用 URL 获取该存储桶中的 blob 列表

  2. 对于每个 blob,我都会进行一些 GCS API 调用以获取有关 blob 的信息(blob.size、blob.name 等)

  3. 对于每个 blob,我还必须阅读它,在其中找到一些内容并将其添加到从 GCS API 调用获得的值中

  4. 对于每个 blob,我必须将在第 2 步和第 3 步中找到的关于 blob 的值写入 BigQuery

我有数千个 blob,所以这需要使用 Apache Beam 来完成(我被推荐过)

我对管道的想法是这样的:

GetUrlOfBucket 并生成 PCollection

使用该 PCollection 获取 blob 列表作为新的 PCollection

使用这些 blob 的元数据创建一个 PCollection

执行一个转换,它将接收作为元数据值字典的 PCollection,进入 blob,扫描一个值并返回一个新的 PCollection,它是元数据值的字典和这个新值

将此写入 BigQuery。

我特别难以考虑如何将字典作为 PCollection 返回

[+] 我读到的:

https://beam.apache.org/documentation/programming-guide/#composite-transforms

https://medium.com/@rajeshhegde/data-pipeline-using-apache-beam-python-sdk-on-dataflow-6bb8550bf366

非常感谢任何建议,特别是关于如何获取该存储桶名称并返回 PCollection 的 blob。

标签: pythonpython-2.7google-cloud-storageapache-beam

解决方案


我通过阅读有关 apache-beam 的更多信息并弄清楚我必须使用 ParDo 函数在我的资源之间拆分作业来解决这个问题,在 ParDo 中我调用我的 DoFn 函数,该函数接受一个元素并执行它所需的所有处理并产生一个 dic。请参阅这篇文章Apache Beam:如何同时创建许多经历相同 PTransform 的 PCollection?

    class ExtractMetadata(beam.DoFn):                                                                                                                                                                                                                                                  
def process(self, element):                                                                                                                                                                                                                                                    
    """                                                                                                                                                                                                                                                                        
    Takes in a blobName, fetches the blob and its values and returns a dictionary of values                                                                                                                                                                                    
    """                                                                                                                                                                                                                                                                        
    metadata = element.metadata                                                                                                                                                                                                                                                
    if metadata is not None:                                                                                                                                                                                                                                                   
        event_count = int(metadata['count'])                                                                                                                                                                                                                                   
    else:                                                                                                                                                                                                                                                                      
        event_count = None                                                                                                                                                                                                                                                     

    event_type = self.determine_event_type(element.id)                                                                                                                                                                                                                         
    cluster    = self.determine_cluster(element.id)                                                                                                                                                                                                                            
    customer   = self.determine_customer(element)                                                                                                                                                                                                                              
   # date = datetime.strptime(element.time_created, '%a, %d %b %Y %H:%M:%S')                                                                                                                                                                                                   
    #date = date.isoformat()                                                                                                                                                                                                                                                   
    dic = {                                                                                                                                                                                                                                                                    
        'blob_name'       : element.name,                                                                                                                                                                                                                                      
        'event_path'      : element.path,                                                                                                                                                                                                                                      
        'size'            : int(element.size),                                                                                                                                                                                                                                 
        'time_of_creation': element.time_created.isoformat(),                                                                                                                                                                                                                  
        'event_count'     : event_count,                                                                                                                                                                                                                                       
        'event_type'      : event_type,                                                                                                                                                                                                                                        
        'cluster'         : cluster,                                                                                                                                                                                                                                           
        'customer'        : customer                                                                                                                                                                                                                                           
    }                                                                                                                                                                                                                                                                          
    yield dic

推荐阅读