首页 > 解决方案 > 在 Apache Beam 中为每个键应用有状态的 DoFn

问题描述

是否可以仅将状态转换应用于键控 PCollection 中的值?

例如,假设此 PCollection 键入邮政编码。这些值是包含 user_id 键的字典。在这个有状态的 DoFn 中,我想跟踪每个邮政编码看到的所有 user_id。但是,鉴于邮政编码的数量庞大,将所有邮政编码、user_id 对存储在状态中变得难以处理。但是,如果我只对每个键应用这个有状态的 DoFn,那么我不需要将邮政编码显式存储在 state 中。

从 Python 文档来看,这似乎是不可能的。最好的方法是滥用自定义的 CombineFn 吗?

谢谢!

标签: pythongoogle-cloud-dataflowapache-beam

解决方案


我认为您想要的是CombinePerKey。它仅对值或每个键应用 CombineFn。

此外,在使用 Combine 时考虑 Reduce 阶段也很重要。

希望这个例子对你有所帮助。(添加了打印,以便您可以看到 Reduce 阶段以及ifs的原因)

with beam.Pipeline(options=pipeline_options) as p:

    keyed_elements = [
        (47001, {"user_id": 1, "fake_key":"fake_value"}),
        (47001, {"user_id": 2, "fake_key": "fake_value"}),
        (47002, {"user_id": 3, "fake_key": "fake_value"}),
        (47002, {"user_id": 4, "fake_key": "fake_value"}),
        (47003, {"user_id": 5, "fake_key": "fake_value"}),
        (47001, {"user_id": 6, "fake_key": "fake_value"}),
        (47001, {"user_id": 7, "fake_key": "fake_value"}),
        (47001, {"user_id": 8, "fake_key": "fake_value"}),
        (47001, {"user_id": 9, "fake_key": "fake_value"}),
        (47001, {"user_id": 10, "fake_key": "fake_value"}),
        (47001, {"user_id": 11, "fake_key": "fake_value"}),
    ]

    def group_users(elements_values):

        #to test paralellism in reduce phase
        print(f"ELEMENT: {elements_values}")


        final_output = []
        for value in elements_values:
            if isinstance(value, dict):
                final_output.append(value['user_id'])
            elif isinstance(value, list):
                final_output += value
            else:
                pass

        return final_output

    (p | Create(keyed_elements)
       | beam.CombinePerKey(group_users)
       | Map(print)
     )

推荐阅读