首页 > 解决方案 > 如何将字典作为 PCollection 返回?


我得到了一个谷歌云存储桶的 URL。我必须:

  1. 使用 URL 获取该存储桶中的 blob 列表

  2. 对于每个 blob,我都会进行一些 GCS API 调用以获取有关 blob 的信息(blob.size、blob.name 等)

  3. 对于每个 blob,我还必须阅读它,在其中找到一些内容并将其添加到从 GCS API 调用获得的值中

  4. 对于每个 blob,我必须将在第 2 步和第 3 步中找到的关于 blob 的值写入 BigQuery

我有数千个 blob,所以这需要使用 Apache Beam 来完成(我被推荐过)


GetUrlOfBucket 并生成 PCollection

使用该 PCollection 获取 blob 列表作为新的 PCollection

使用这些 blob 的元数据创建一个 PCollection

执行一个转换,它将接收作为元数据值字典的 PCollection,进入 blob,扫描一个值并返回一个新的 PCollection,它是元数据值的字典和这个新值

将此写入 BigQuery。

我特别难以考虑如何将字典作为 PCollection 返回

[+] 我读到的:



非常感谢任何建议,特别是关于如何获取该存储桶名称并返回 PCollection 的 blob。

标签: pythonpython-2.7google-cloud-storageapache-beam


我通过阅读有关 apache-beam 的更多信息并弄清楚我必须使用 ParDo 函数在我的资源之间拆分作业来解决这个问题,在 ParDo 中我调用我的 DoFn 函数,该函数接受一个元素并执行它所需的所有处理并产生一个 dic。请参阅这篇文章Apache Beam:如何同时创建许多经历相同 PTransform 的 PCollection?

    class ExtractMetadata(beam.DoFn):                                                                                                                                                                                                                                                  
def process(self, element):                                                                                                                                                                                                                                                    
    Takes in a blobName, fetches the blob and its values and returns a dictionary of values                                                                                                                                                                                    
    metadata = element.metadata                                                                                                                                                                                                                                                
    if metadata is not None:                                                                                                                                                                                                                                                   
        event_count = int(metadata['count'])                                                                                                                                                                                                                                   
        event_count = None                                                                                                                                                                                                                                                     

    event_type = self.determine_event_type(element.id)                                                                                                                                                                                                                         
    cluster    = self.determine_cluster(element.id)                                                                                                                                                                                                                            
    customer   = self.determine_customer(element)                                                                                                                                                                                                                              
   # date = datetime.strptime(element.time_created, '%a, %d %b %Y %H:%M:%S')                                                                                                                                                                                                   
    #date = date.isoformat()                                                                                                                                                                                                                                                   
    dic = {                                                                                                                                                                                                                                                                    
        'blob_name'       : element.name,                                                                                                                                                                                                                                      
        'event_path'      : element.path,                                                                                                                                                                                                                                      
        'size'            : int(element.size),                                                                                                                                                                                                                                 
        'time_of_creation': element.time_created.isoformat(),                                                                                                                                                                                                                  
        'event_count'     : event_count,                                                                                                                                                                                                                                       
        'event_type'      : event_type,                                                                                                                                                                                                                                        
        'cluster'         : cluster,                                                                                                                                                                                                                                           
        'customer'        : customer                                                                                                                                                                                                                                           
    yield dic
