google-cloud-dataflow - Apache Beam:如何返回多个输出
问题描述
在下面的函数中。我也想返回important_col变量。
class FormatInput(beam.DoFn):
def process(self, element):
""" Format the input to the desired shape"""
df = pd.DataFrame([element], columns=element.keys())
if 'reqd' in df.columns:
important_col= 'reqd'
elif 'customer' in df.columns:
important_col= 'customer'
elif 'phone' in df.columns:
important_col= 'phone'
else:
raise ValueError(['Important columns not specified'])
output = df.to_dict('records')
return output
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:
clean_csv = (p
| 'Read input file' >> beam.dataframe.io.read_csv('raw_data.csv'))
to_process = clean_csv | 'pre-processing' >> beam.ParDo(FormatInput())
在上面的管道中,我想从格式输入中返回Important_col变量。
- 一旦我有了那个变量,我想将它作为参数传递给管道中的下一步
- 我还想将to_process转储到 CSV 文件。
我尝试了以下方法,但都没有奏效。
- 将 to_process 转换为 to_dataframe 并尝试 to_csv。我有错误。
- 我还尝试将 pcollection 转储到 csv。我不知道该怎么做。我参考了官方的 apache Beam 文档,但我没有找到任何与我的用例类似的文档。
解决方案
推荐阅读
- cocoapods - .h 文件未为项目中的 CocoaPod 生成
- r - 如何向使用 tidy eval 框架创建的函数添加检查?
- html - 如何从我的计算机将图标添加到本地链接?
- java - 删除程序输出前出现的文本
- css - 让我的网站响应更短的屏幕
- primefaces - 向导,单击选项卡并重定向到它
- amazon-web-services - 如何创建目录以在 AWS Linux 实例上挂载新的 ext4 卷
- sql-server - 为什么我转换部署模型时没有创建项目参数?
- redis - 缺少 Redis 部门
- javascript - 处理来自 PHP 文件的数据后,LaTeX 代码不起作用