首页 > 解决方案 > 如何从 Apache Beam 中的 Pcollection 中获取一个元素

问题描述

考虑 Pcollection 的列表:

[{'id':'1','name':'Tom','country':'USA'},{'id':'2','name':'Oprah','country':'USA '}....]

我想统计每个国家的发生情况。结果应该是这样的:

{'美国':2,'突尼斯':3,'法国':1}

标签: pythongoogle-cloud-dataflowapache-beam

解决方案


检查beam.combiners.ToDict,它会产生一个 dict 作为结果;

例子:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

p = beam.Pipeline(options=PipelineOptions()) 

(p  
| "create pcoll" >> beam.Create([{'id':'1','name':'Tom','country':'USA'},
                                                {'id':'2','name':'Oprah','country':'USA'},
                                                {'id':'2','name':'Oprah','country':'Italy'}])
| "map" >> beam.Map(lambda x: (x['country']))
| "count" >> beam.combiners.Count.PerElement()
| "toDict" >> beam.combiners.ToDict()
| "print" >> beam.Map(print)
) 

p.run()

# Result {'USA': 2, 'Italy': 1}

推荐阅读