首页 > 解决方案 > Onyx 中的水印触发器不会触发

问题描述

我有一个 Onyx 段流,它是带有时间戳的消息(按时间顺序排列)。说,它们看起来像这样:

{:id 1 :timestamp "2018-09-04 13:15:42" :msg "Hello, World!"}
{:id 2 :timestamp "2018-09-04 21:32:03" :msg "Lorem ipsum"}
{:id 3 :timestamp "2018-09-05 03:01:52" :msg "Dolor sit amet"}
{:id 4 :timestamp "2018-09-05 09:28:16" :msg "Consetetur sadipscing"}
{:id 5 :timestamp "2018-09-05 12:45:33" :msg "Elitr sed diam"}
{:id 6 :timestamp "2018-09-06 08:14:29" :msg "Nonumy eirmod"}
...

对于数据中的每个时间窗口(一天),我想对其所有段的集合运行计算。即,在示例中,我想对 id 为 1 和 2(9 月 4 日)的段进行操作,接下来对 id 3、4 和 5(9 月 5 日)进行操作,依此类推。

Onyx 提供窗口和触发器,它们应该可以做我想要的。如果我使用一个窗口:window/type :fixed并聚合:window/range [1 :day]关于:window/window-key :timestamp,我将聚合每一天的所有部分。

为了只在一天的所有时段都到达时触发我的计算,Onyx 提供了触发行为:onyx.triggers/watermark。根据文档,它应该开火

如果:window/window-key段中的值超过活动窗口范围的上限

但是,触发器不会触发,即使我可以看到后面的段已经进入并且几个窗口应该已满。作为健全性检查,我尝试了一个简单的:onyx.triggers/segment触发器,它按预期工作。


我创建一个最小示例的失败尝试:

我修改了固定的 windows 玩具作业来测试水印触发,它在那里工作

但是,我发现在这个玩具作业中,触发水印触发器的原因可能是:

它是否关闭了输入通道?也许刚刚完成的工作也可以触发水印。


与水印触发交互的另一个方面是对等点对任务的分布式工作

Onyx repo 中对issue #839 ( :trigger/emitnot working with ) 的评论指出我在:onyx.triggers/watermarkissue #840 ( Watermark doesn't work with Kafka topic has > 1 partition ),在那里我发现了这个线索(强调我的):

问题是您的所有数据都在一个分区上结束,并且水印始终采用所有输入对等点的最小水印(如果使用本机 kafka 水印,则为给定对等点的最小水印)。

当您使用少量数据和自动分区分配调用 g/send 时,您的所有数据最终都在一个分区上,这意味着另一个分区的对等方继续发出0 的水印


发现

不可能将它与当前的水印触发器一起使用,它依赖于输入源。您可以尝试提取以前的水印实现 [...]

但是,在我的任务图中,我想在 windows 中聚合的段仅在某些中间任务中创建,它们并非源自输入任务本身。输入段仅向该中间任务提供如何创建/检索段内容的信息。

同样,这种结构在上述玩具工作中也能正常工作。原因是输入通道在某个时间点关闭,从而结束工作,进而触发水印。所以我的玩具示例实际上不是一个好的模型,因为它不是一个开放式流。

如果作业确实从实际输入源获得了有问题的片段,但没有时间戳,Onyx 似乎提供了指定 a 的空间assign-watermark-fn,这是输入任务的可选属性。该函数在每次到达新段时设置水印。就我而言,这无济于事,因为这些段并非源自输入任务。

标签: clojurewindowingonyx-platform

解决方案


我现在自己想出了一个解决方法。该文档基本上提供了如何做到这一点的线索:

这是标点符号触发器的快捷功能,当任何一条数据具有高于另一个范围的基于时间的窗口键时触发,有效地声明将不再有更早窗口的数据到达。

所以我改变了发射段的任务,这样每个段都会发出另一个像段一样的“哨兵”:

[{:id 1 :timestamp "2018-09-04 13:15:42" :msg "Hello, World!"}
{:timestamp "2018-09-03 13:15:42" :over :out}]

请注意,:timestamp它早于窗口范围(此处为 1 天)。所以它将被发送到上一个窗口。由于我的数据是按时间顺序输入的,:punctuation触发器可以从“哨兵”段(带有关键字:over)的存在中判断窗口可以关闭。不要忘记逐出(即,:trigger/post-evictor [:all])并从最终窗口中丢弃“哨兵”段。添加:onyx/max-peers 1任务地图可确保哨兵始终最终到达,尤其是在使用分组时。

请注意,此解决方法有两个假设:

  1. 数据按时间顺序排列
  2. 没有没有段的窗户

推荐阅读