python - 如何从 Apache Beam 中的 Pcollection 中获取一个元素
问题描述
考虑 Pcollection 的列表:
[{'id':'1','name':'Tom','country':'USA'},{'id':'2','name':'Oprah','country':'USA '}....]
我想统计每个国家的发生情况。结果应该是这样的:
{'美国':2,'突尼斯':3,'法国':1}
解决方案
检查beam.combiners.ToDict,它会产生一个 dict 作为结果;
例子:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
p = beam.Pipeline(options=PipelineOptions())
(p
| "create pcoll" >> beam.Create([{'id':'1','name':'Tom','country':'USA'},
{'id':'2','name':'Oprah','country':'USA'},
{'id':'2','name':'Oprah','country':'Italy'}])
| "map" >> beam.Map(lambda x: (x['country']))
| "count" >> beam.combiners.Count.PerElement()
| "toDict" >> beam.combiners.ToDict()
| "print" >> beam.Map(print)
)
p.run()
# Result {'USA': 2, 'Italy': 1}
推荐阅读
- codeigniter - 无法在我的远程服务器上保存生成的 pdf
- php - 使用 php 数组的名称创建新链接
- f# - 显示 str 的次数
- ios - 等待两个 NSOperation 完成而不阻塞 UI 线程
- python - 多个键按下 Pygame
- reactjs - 如何在 Reactjs 中将文本 + 图标添加到 HTML 表格行元素
- python - 通过环境将密码安全地传递给 subprocess.Popen
- node.js - 如何在测试中获取 serverless.yml 中定义的环境变量
- python - Python找不到Powershell的路径
- python - 如何从谷歌云存储桶下载文件?