python - Apache Beam - Python:如何通过累积获得 PCollection 的前 10 个元素?
问题描述
我想像这样提取前 10 名的最高分:
Paul - 38
Michel - 27
Hugo - 27
Bob - 24
Kevin - 19
...
(10 elements)
我正在使用一个固定窗口和一个数据驱动的触发器,它在窗格收集 X 元素后输出早期结果。另外,我正在使用组合器来获得前 10 名的最高分。
(inputs
| 'Apply Window of time' >> beam.WindowInto(
beam.window.FixedWindows(size=5 * 60))
trigger=trigger.Repeatedly(trigger.AfterCount(2)),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
| beam.ParDo(PairWithOne()) # ('key', 1)
| beam.CombinePerKey(sum)
| 'Top 10 scores' >> beam.CombineGlobally(
beam.combiners.TopCombineFn(n=10,
compare=lambda a, b: a[1] < b[
1])).without_defaults())
这里的问题是第一个输出似乎是正确的,但以下输出包含重复的键:
Paul - 38
Paul - 37
Michel - 27
Paul - 36
Michel - 26
Kevin - 20
...
(10 elements)
如您所见,我没有得到 10 个不同的 K/V 对,而是重复的密钥。
当不使用触发/累积策略时,这很有效..但如果我想有 2 小时的窗口,我想获得频繁更新...
解决方案
正如评论中所讨论的,一种可能性是转换到Discardingfired panes,可以通过设置accumulation_mode=trigger.AccumulationMode.DISCARDING
。如果您仍想保留该ACCUMULATING
模式,您可能需要修改TopCombineFn
,以便来自同一用户的重复窗格覆盖以前的值并避免重复键。TopDistinctFn
将以Beam SDK 2.13.0 的代码为基础。在该add_input
方法中,我们将按如下方式进行先前的检查:
for current_top_element in enumerate(heap):
if element[0] == current_top_element[1].value[0]:
heap[current_top_element[0]] = heap[-1]
heap.pop()
heapq.heapify(heap)
基本上,我们将比较我们正在评估的元素的键 ( element[0]
) 与堆中的每个元素。堆元素是类型的ComparableValue
,因此我们可以使用value
它来取回元组(并value[0]
获取密钥)。如果它们匹配,我们将希望将其从堆中弹出(因为我们正在累积总和会更大)。Beam SDK 使用该heapq
库,因此我基于此答案的方法来删除i-th
元素(我们enumerate
用来保留索引信息)。
我添加了一些日志记录以帮助检测重复项:
logging.info("Duplicate: " + element[0] + "," + str(element[1]) + ' --- ' + current_top_element[1].value[0] + ',' + str(current_top_element[1].value[1]))
该代码位于top.py
文件夹内的文件中combiners
(带有__init__.py
),我将其导入:
from combiners.top import TopDistinctFn
然后,我可以TopDistinctFn
像这样在管道中使用:
(inputs
| 'Add User as key' >> beam.Map(lambda x: (x, 1)) # ('key', 1)
| 'Apply Window of time' >> beam.WindowInto(
beam.window.FixedWindows(size=10*60),
trigger=beam.trigger.Repeatedly(beam.trigger.AfterCount(2)),
accumulation_mode=beam.trigger.AccumulationMode.ACCUMULATING)
| 'Sum Score' >> beam.CombinePerKey(sum)
| 'Top 10 scores' >> beam.CombineGlobally(
TopDistinctFn(n=10, compare=lambda a, b: a[1] < b[1])).without_defaults()
| 'Print results' >> beam.ParDo(PrintTop10Fn()))
完整的代码可以在这里找到。generate_messages.py
是 Pub/Sub 消息生成器,top.py
包含TopCombineFn
改名的自定义版本TopDistinctFn
(可能看起来势不可挡,但我只添加了从第 425 行开始的几行代码)和主管test_combine.py
道代码。要运行它,您可以将文件放在正确的文件夹中,如果需要,安装 Beam SDK 2.13.0,在 和 中修改项目 ID 和 Pub/Subgenerate_messages.py
主题test_combine-py
。然后,使用 : 开始发布消息,并在不同的 shell中python generate_messages.py
运行管道DirectRunner
。您可能需要使用文件添加python test_combine.py --streaming
额外的文件。DataflowRunner
setup.py
例如,Bob
他以 9 分领先,下一次更新时,他的得分高达 11 分。他将出现在下一次回顾中,只有更新的分数,没有重复(在我们的日志中检测到)。9 分的条目将不会出现,并且顶部仍将根据需要有 10 个用户。同样对于Marta
. 我注意到即使不在前 10 名中,旧分数仍然出现在堆中,但我不确定垃圾收集如何与heapq
.
INFO:root:>>> Current top 10: [('Bob', 9), ('Connor', 8), ('Eva', 7), ('Hugo', 7), ('Paul', 6), ('Kevin', 6), ('Laura', 6), ('Marta', 6), ('Diane', 4), ('Bacon', 4)]
...
INFO:root:Duplicate: Marta,8 --- Marta,6
INFO:root:Duplicate: Bob,11 --- Bob,9
INFO:root:>>> Current top 10: [('Bob', 11), ('Connor', 8), ('Marta', 8), ('Bacon', 7), ('Eva', 7), ('Hugo', 7), ('Paul', 6), ('Laura', 6), ('Diane', 6), ('Kevin', 6)]
让我知道这是否也适用于您的用例。
推荐阅读
- flask - WSGI 错误 - Flask API 上的 Werkzeug 错误
- javascript - Reactjs react-bootstrap 模态“容器”属性
- ios - 我如何知道我的应用程序是否通过 VPN 进行通信?
- mqtt - 时序见解未显示键/值对的子对象属性
- typescript - 多个文件中具有相同名称的打字稿函数
- reactjs - 为什么在 Next.js 中使用响应式条件渲染会出现渲染失真错误?
- reactjs - 有没有办法在 url 更改时更新在单个 spa 中注册的应用程序的历史位置?
- discord - 我想检查是否有人有被踢的权限,但答案是错误的
- filter - 赛普拉斯:有没有办法将过滤器与正则表达式一起使用?
- c# - 使用 SQL Server 中的数据填充 Blazor/Razor 中的下拉列表