首页 > 解决方案 > Apache Flink:如何将自定义逻辑应用于延迟事件?

问题描述

尽管 Flink 有一些内置的工具来处理迟到的数据,比如allowed lateness,我还是想自己处理迟到的数据。例如,我想监控迟到的事件或只是将它们保存到数据库中。

我怎样才能做到这一点?

标签: streamingapache-flinkflink-streaming

解决方案


通常在窗口运算符中使用延迟和水印。如果您使用的是窗口运算符,则可以像这样使用侧输出:

val windowStream = eventStream.keyBy(output => output.rule)
  .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MINUTES)))
  .sideOutputLateData(lateOutputTag)

并从侧面输出中获取后期元素,如下所示:

windowStream.getSideOutput(lateOutputTag).print()

推荐阅读