首页 > 解决方案 > Apache Beam - Python:如何通过累积获得 PCollection 的前 10 个元素?

问题描述

我想像这样提取前 10 名的最高分:

Paul - 38
Michel - 27
Hugo - 27
Bob - 24
Kevin - 19
...
(10 elements)

我正在使用一个固定窗口和一个数据驱动的触发器,它在窗格收集 X 元素后输出早期结果。另外,我正在使用组合器来获得前 10 名的最高分。

(inputs
         | 'Apply Window of time' >> beam.WindowInto(
                        beam.window.FixedWindows(size=5 * 60))
                        trigger=trigger.Repeatedly(trigger.AfterCount(2)),
                  accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
         | beam.ParDo(PairWithOne()) # ('key', 1)
         | beam.CombinePerKey(sum)
         | 'Top 10 scores' >> beam.CombineGlobally(
                        beam.combiners.TopCombineFn(n=10,
                                                    compare=lambda a, b: a[1] < b[
                                                        1])).without_defaults())

这里的问题是第一个输出似乎是正确的,但以下输出包含重复的键:

Paul - 38
Paul - 37
Michel - 27
Paul - 36
Michel - 26
Kevin - 20
...
(10 elements)

如您所见,我没有得到 10 个不同的 K/V 对,而是重复的密钥。

当不使用触发/累积策略时,这很有效..但如果我想有 2 小时的窗口,我想获得频繁更新...

标签: pythongoogle-cloud-dataflowapache-beam

解决方案


正如评论中所讨论的,一种可能性是转换到Discardingfired panes,可以通过设置accumulation_mode=trigger.AccumulationMode.DISCARDING。如果您仍想保留该ACCUMULATING模式,您可能需要修改TopCombineFn,以便来自同一用户的重复窗格覆盖以前的值并避免重复键。TopDistinctFn将以Beam SDK 2.13.0 的代码为基础。在该add_input方法中,我们将按如下方式进行先前的检查:

for current_top_element in enumerate(heap):
  if element[0] == current_top_element[1].value[0]:
    heap[current_top_element[0]] = heap[-1]
    heap.pop()
    heapq.heapify(heap)

基本上,我们将比较我们正在评估的元素的键 ( element[0]) 与堆中的每个元素。堆元素是类型的ComparableValue,因此我们可以使用value它来取回元组(并value[0]获取密钥)。如果它们匹配,我们将希望将其从堆中弹出(因为我们正在累积总和会更大)。Beam SDK 使用该heapq库,因此我基于此答案的方法来删除i-th元素(我们enumerate用来保留索引信息)。

我添加了一些日志记录以帮助检测重复项:

logging.info("Duplicate: " + element[0] + "," + str(element[1]) + ' --- ' + current_top_element[1].value[0] + ',' + str(current_top_element[1].value[1]))

该代码位于top.py文件夹内的文件中combiners(带有__init__.py),我将其导入:

from combiners.top import TopDistinctFn

然后,我可以TopDistinctFn像这样在管道中使用:

(inputs
     | 'Add User as key' >> beam.Map(lambda x: (x, 1)) # ('key', 1)
     | 'Apply Window of time' >> beam.WindowInto(
                    beam.window.FixedWindows(size=10*60),
                    trigger=beam.trigger.Repeatedly(beam.trigger.AfterCount(2)),
                    accumulation_mode=beam.trigger.AccumulationMode.ACCUMULATING)
     | 'Sum Score' >> beam.CombinePerKey(sum)   
     | 'Top 10 scores' >> beam.CombineGlobally(
                    TopDistinctFn(n=10, compare=lambda a, b: a[1] < b[1])).without_defaults()
     | 'Print results' >> beam.ParDo(PrintTop10Fn()))

完整的代码可以在这里找到。generate_messages.py是 Pub/Sub 消息生成器,top.py包含TopCombineFn改名的自定义版本TopDistinctFn(可能看起来势不可挡,但我只添加了从第 425 行开始的几行代码)和主管test_combine.py道代码。要运行它,您可以将文件放在正确的文件夹中,如果需要,安装 Beam SDK 2.13.0,在 和 中修改项目 ID 和 Pub/Subgenerate_messages.py主题test_combine-py。然后,使用 : 开始发布消息,并在不同的 shell中python generate_messages.py运行管道DirectRunner。您可能需要使用文件添加python test_combine.py --streaming额外文件。DataflowRunnersetup.py

例如,Bob他以 9 分领先,下一次更新时,他的得分高达 11 分。他将出现在下一次回顾中,只有更新的分数,没有重复(在我们的日志中检测到)。9 分的条目将不会出现,并且顶部仍将根据需要有 10 个用户。同样对于Marta. 我注意到即使不在前 10 名中,旧分数仍然出现在堆中,但我不确定垃圾收集如何与heapq.

INFO:root:>>> Current top 10: [('Bob', 9), ('Connor', 8), ('Eva', 7), ('Hugo', 7), ('Paul', 6), ('Kevin', 6), ('Laura', 6), ('Marta', 6), ('Diane', 4), ('Bacon', 4)]
...
INFO:root:Duplicate: Marta,8 --- Marta,6
INFO:root:Duplicate: Bob,11 --- Bob,9
INFO:root:>>> Current top 10: [('Bob', 11), ('Connor', 8), ('Marta', 8), ('Bacon', 7), ('Eva', 7), ('Hugo', 7), ('Paul', 6), ('Laura', 6), ('Diane', 6), ('Kevin', 6)]

让我知道这是否也适用于您的用例。


推荐阅读