python - 如何在返回/收益中停止额外的重复,同时仍然保持给定键的运行总数:值对?
问题描述
将 Pcollection 传递给下一个变换后,变换的回报/产量正在成倍增加,而对于给定的街道和事故计数,我只需要一个 KV 对。
我的理解是生成器可以通过保持值来帮助实现这一点,但这只能解决我的部分问题。我试图在发送到下一个转换之前确定大小,但我没有找到任何方法可以让我得到传递的 Pcollection 元素的真实大小。
class CountAccidents(beam.DoFn):
acci_dict = {}
def process(self, element):
if self.acci_dict.__contains__(element[0]['STREET_NAME']):
self.acci_dict[element[0]['STREET_NAME']] += 1
else:
self.acci_dict.update({element[0]['STREET_NAME']: 1})
if self.acci_dict != {}:
yield self.acci_dict
def run():
with beam.Pipeline() as pl:
test = (pl | 'Read' >> beam.io.ReadFromText('/modified_Excel_Crashes_Chicago.csv')
| 'Map Accident' >> beam.ParDo(AccidentstoDict())
| 'Count Accidents' >> beam.ParDo(CountAccidents())
| 'Print to Text' >> beam.io.WriteToText('/letstestthis', file_name_suffix='.txt'))```
Input Pcollection:
[{'CRASH_DATE': '3/25/19 0:25', 'WEATHER_CONDITION': 'CLEAR', 'STREET_NAME': 'KOSTNER AVE', 'CRASH_HOUR': '0'}]
[{'CRASH_DATE': '3/24/19 23:40', 'WEATHER_CONDITION': 'CLEAR', 'STREET_NAME': 'ARCHER AVE', 'CRASH_HOUR': '23'}]
[{'CRASH_DATE': '3/24/19 23:30', 'WEATHER_CONDITION': 'UNKNOWN', 'STREET_NAME': 'VAN BUREN ST', 'CRASH_HOUR': '23'}]
I expect to get this:
{'KILPATRICK AVE': 1, 'MILWAUKEE AVE': 1, 'CENTRAL AVE': 2, 'WESTERN AVE': 6, 'DANTE AVE': 1}
What I get is this(a slow build-up till complete):
{'KOSTNER AVE': 1}
{'KOSTNER AVE': 1, 'ARCHER AVE': 1}
{'KOSTNER AVE': 2, 'ARCHER AVE': 2, 'VAN BUREN ST': 1}
解决方案
您需要对每个键进行组合,对于 Count,您可以在此处使用一个:
https://beam.apache.org/releases/pydoc/2.9.0/apache_beam.transforms.combiners.html
在您的读取操作之后,输出一个 KeyValue,它是 {STREET,1},然后是一个 Count per key 转换,它将为您提供街道的全局计数。
例如,如果您想要每周的输出,那么从那里添加窗口功能也很容易。您只需将时间戳和窗口添加到调用中。如何做到这一点的例子在这里:
推荐阅读
- c# - 将 Google AMP(加速移动页面)应用到 ASP.NET Core 站点
- javascript - 如何避免角度的可观察延迟或确保仅在可观察准备就绪时调用我的函数
- reactjs - 升级到 Redux v6
- python - 使用 winsound,播放错误声音而不是选择的声音
- dynamic - 是否可以在 WSO2 的业务模板编辑器中使属性选项动态化?
- jquery - 如何对包含Coldfusion服务器端生成的值而不是jQuery Datatables中的数据库值的列进行排序
- python - 创建一个包含每个事务文件的列表
- haskell - 类型变量条件下的 Haskell 实例
- python - 使用远程解释器在服务器上运行 Django - 防止 Django 创建测试数据库
- php - Phpword - 使用模板处理器添加图像