python - 如何通过侧面输入将两个 Pcollections(各种大小/数据)与一个公共“键”(Street)合并?
问题描述
我有两个 PCollection:一个从 Pub/Sub 中提取信息,一个从 CSV 文件中提取数据。在每个管道中进行了一些不同的转换之后,我想将两者合并到一个它们共享的公共键上,“STREET”。我将第二个 PCollection 作为侧面输入。但是,我在尝试运行时遇到错误。
我尝试使用 CoGroupByKey,但我一直收到有关 Pcollections 中数据类型差异的错误。我尝试重构输出,并通过设置 PCollection 的属性__setattr__
以强制类型相等,但无论如何它都会报告“混合值”。经过进一步研究,似乎最好利用侧输入,特别是当元素之间的数据大小存在差异时。即使有侧面输入,我仍然无法克服当前的错误:
from_runner_api raise ValueError('No producer for %s' % id)
ValueError: No producer for ref_PCollection_PCollection_6
我的应用逻辑如下:
def merge_accidents(element, pcoll):
print(element)
print(pcoll)
"some code that will append to existing data"
accident_pl = beam.Pipeline()
accident_data = (accident_pl |
'Read' >> beam.io.ReadFromText('/modified_Excel_Crashes_Chicago.csv')
| 'Map Accidents' >> beam.ParDo(AccidentstoDict())
| 'Count Accidents' >> Count.PerKey())
chi_traf_pl = beam.Pipeline(options=pipeline_options)
chi_traffic = (chi_traf_pl | 'ReadPubSub' >> beam.io.ReadFromPubSub(subscription=subscription_name, with_attributes=True)
| 'GeoEnrich&Trim' >> beam.Map(loc_trim_enhance)
| 'TimeDelayEnrich' >> beam.Map(timedelay)
| 'TrafficRatingEnrich' >> beam.Map(traffic_rating)
| 'MergeAccidents' >> beam.Map(merge_accidents, pcoll=AsDict(accident_data))
| 'Temp Write'>> beam.io.WriteToText('testtime', file_name_suffix='.txt'))
accident_pl.run()
chi_result = chi_traf_pl.run()
chi_result.wait_until_finish()```
**Pcoll 1:**
[{'segment_id': '1', 'street': 'Western Ave', 'direction': 'EB', 'length': '0.5', 'cur_traffic': '24', 'county': 'Cook County', 'neighborhood': 'West Elsdon', 'zip_code': '60629', 'evnt_timestamp': '2019-04-01 20:50:20.0', 'traffic_rating': 'Heavy', 'time_delay': '0.15'}]
**Pcoll 2:**
('MILWAUKEE AVE', 1)
('CENTRAL AVE', 2)
('WESTERN AVE', 6)
**Expected:**
[{'segment_id': '1', 'street': 'Western Ave', 'direction': 'EB', 'length': '0.5', 'cur_traffic': '24', 'county': 'Cook County', 'neighborhood': 'West Elsdon', 'zip_code': '60629', 'evnt_timestamp': '2019-04-01 20:50:20.0', 'traffic_rating': 'Heavy', 'time_delay': '0.15', 'accident_count': '6'}]
**Actual Results:**
"from_runner_api raise ValueError('No producer for %s' % id)ValueError: No producer for ref_PCollection_PCollection_6
解决方案
所以我发现了问题所在。在查看了 pipeline.py 和 unittest 源以获取侧面输入后,我意识到对创建的 Pipeline 对象进行了检查。
我是新手,所以我最初认为您需要创建两个单独的 Pipeline 对象(流式与批处理),以便我可以将不同的选项传递给两者;iestreaming:是的。话虽如此,我不认为这是必要的。
将它们合并到如下所示的单个对象后,错误消失了,我能够接受函数的侧面输入:
'''
pipeline = beam.Pipeline(options=pipeline_options)
accident_data = (pipeline
| 'Read' >> beam.io.ReadFromText('modified_Excel_Crashes_Chicago.csv')
| 'Map Accidents' >> beam.ParDo(AccidentstoDict())
| 'Count Accidents' >> Count.PerKey())
chi_traffic = (pipeline
| 'ReadPubSub' >> beam.io.ReadFromPubSub(subscription=subscription_name, with_attributes=True)
| 'GeoEnrich&Trim' >> beam.Map(loc_trim_enhance)
| 'TimeDelayEnrich' >> beam.Map(timedelay)
| 'TrafficRatingEnrich' >> beam.Map(traffic_rating)
| 'MergeAccidents' >> beam.Map(merge_accidents, pcoll=pvalue.AsDict(accident_data))
| 'Temp Write' >> beam.io.WriteToText('testtime',
file_name_suffix='.txt'))
chi_result = pipeline.run()
chi_result.wait_until_finish()
'''
推荐阅读
- amazon-web-services - 如何使用侦听器和任务配置目标组端口?
- windows - 使用 IrfanView 在 Windows 10 中显示图像命令行
- javascript - AlpineJs,向元素添加动态ID并比较鼠标悬停事件
- python - Python 错误,使用 mysqldb 模块的 sql 语句出现 1064
- c# - ToolStripControlHost 的 AutoSize 无法正常工作
- reactjs - pnp/sp PeoplePicker 中的必需属性不起作用
- mysql - 联合查询的总百分比
- java - 如何将节点的边界固定到其中的内容
- r - 如何折叠列表中的值以允许将数据框中的列表列转换为向量?
- ios - 将数据添加到 AnyObject Var 的问题,以便我可以制作原生广告