首页 > 解决方案 > Apache Beam:如何返回多个输出

问题描述

在下面的函数中。我也想返回important_col变量。

class FormatInput(beam.DoFn):                          
         def process(self, element):                 
             """ Format the input to the desired shape"""                    
             df = pd.DataFrame([element], columns=element.keys())                       
             if 'reqd' in df.columns:
                 important_col= 'reqd'
             elif 'customer' in df.columns:
                 important_col= 'customer'
             elif 'phone' in df.columns:
                 important_col= 'phone'
             else:
                 raise ValueError(['Important columns not specified'])
             output = df.to_dict('records')
             return output

with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:
    clean_csv = (p 
    | 'Read input file' >>  beam.dataframe.io.read_csv('raw_data.csv'))
    
    to_process = clean_csv | 'pre-processing' >> beam.ParDo(FormatInput())

在上面的管道中,我想从格式输入中返回Important_col变量。

  1. 一旦我有了那个变量,我想将它作为参数传递给管道中的下一步
  2. 我还想将to_process转储到 CSV 文件。

我尝试了以下方法,但都没有奏效。

  1. 将 to_process 转换为 to_dataframe 并尝试 to_csv。我有错误。
  2. 我还尝试将 pcollection 转储到 csv。我不知道该怎么做。我参考了官方的 apache Beam 文档,但我没有找到任何与我的用例类似的文档。

标签: google-cloud-dataflowapache-beamapache-beam-io

解决方案


推荐阅读