python - 在 Apache Beam 中为每个键应用有状态的 DoFn
问题描述
是否可以仅将状态转换应用于键控 PCollection 中的值?
例如,假设此 PCollection 键入邮政编码。这些值是包含 user_id 键的字典。在这个有状态的 DoFn 中,我想跟踪每个邮政编码看到的所有 user_id。但是,鉴于邮政编码的数量庞大,将所有邮政编码、user_id 对存储在状态中变得难以处理。但是,如果我只对每个键应用这个有状态的 DoFn,那么我不需要将邮政编码显式存储在 state 中。
从 Python 文档来看,这似乎是不可能的。最好的方法是滥用自定义的 CombineFn 吗?
谢谢!
解决方案
我认为您想要的是CombinePerKey。它仅对值或每个键应用 CombineFn。
此外,在使用 Combine 时考虑 Reduce 阶段也很重要。
希望这个例子对你有所帮助。(添加了打印,以便您可以看到 Reduce 阶段以及ifs的原因)
with beam.Pipeline(options=pipeline_options) as p:
keyed_elements = [
(47001, {"user_id": 1, "fake_key":"fake_value"}),
(47001, {"user_id": 2, "fake_key": "fake_value"}),
(47002, {"user_id": 3, "fake_key": "fake_value"}),
(47002, {"user_id": 4, "fake_key": "fake_value"}),
(47003, {"user_id": 5, "fake_key": "fake_value"}),
(47001, {"user_id": 6, "fake_key": "fake_value"}),
(47001, {"user_id": 7, "fake_key": "fake_value"}),
(47001, {"user_id": 8, "fake_key": "fake_value"}),
(47001, {"user_id": 9, "fake_key": "fake_value"}),
(47001, {"user_id": 10, "fake_key": "fake_value"}),
(47001, {"user_id": 11, "fake_key": "fake_value"}),
]
def group_users(elements_values):
#to test paralellism in reduce phase
print(f"ELEMENT: {elements_values}")
final_output = []
for value in elements_values:
if isinstance(value, dict):
final_output.append(value['user_id'])
elif isinstance(value, list):
final_output += value
else:
pass
return final_output
(p | Create(keyed_elements)
| beam.CombinePerKey(group_users)
| Map(print)
)
推荐阅读
- sql - 无法按日期时间排序设置行号
- php-7 - PHP 7 - 如何捕捉“不能使用类型的对象......作为数组”?
- python - 在批处理文件中编辑给定变量
- python - 文本挖掘,自定义词形还原
- sql - oracle apex 应用程序进程 PLSQL 写入使用存储在 DB 表中的 BLOB 布局打印报告
- javascript - 如何在javascript中从位图中获取或分离绘图对象,线条,椭圆等对象
- javascript - 获取 404 未找到 Ajax 向 struts 发布请求的 HTTP 状态操作
- php - 我获取最后插入 ID 的代码有什么问题?
- python - 检查方法是否被导入,从 X 导入 Y
- java - Spring Cloud Bus Kafka Bean 配置