首页 > 解决方案 > Flink - 事件时间滑动窗口,由于时间间隔而在窗口中丢失数据

问题描述

假设我有一系列股票市场交易事件,如下所示:

technical1, ALXN, 1/1/2016
technical1, CELG, 1/1/2016
technical2, ALXN, 1/2/2016
technical2, CELG, 1/2/2016
. . . 
technicalN, ALXN, 4/1/2018
technicalN, CELG, 4/1/2018

使得 TechnicalN(其中 N 是某个数字)表示每日收盘股票的第 N 个技术交易条目 [Open (float), High(float), Low (float), Close (float), Volume (int)]给定公司的市场交易数据。(即,股票代码 GOOG 的技术 1 与股票代码 MSFT 的技术 1 不同。)例如:

12.52, 19.25, 09.11, 17.54, 120532, GOOG, 1/1/2017
14.37, 29.52, 01.53, 12.96, 627156, MSFT, 1/1/2017

(请注意,这些交易价格/交易量完全是虚构的。)

假设我想创建一个大小为 2 且间隔为 1 天的窗口,以便我们的数据看起来像这样:

[technical1, GOOG, 12/26/2017; technical2, GOOG, 12/27/2017]
[technical1, MSFT, 12/26/2017; technical2, MSFT, 12/27/2017]
[technical2, GOOG, 12/27/2017; technical3, GOOG, 12/28/2017]
[technical2, MSFT, 12/27/2017; technical3, MSFT, 12/28/2017]
[technical3, GOOG, 12/28/2017; technical4, GOOG, 12/29/2017]
[technical3, MSFT, 12/28/2017; technical4, MSFT, 12/29/2017]
[technical4, GOOG, 12/29/2017; technical5, GOOG, 12/30/2017]
[technical4, MSFT, 12/29/2017; technical5, MSFT, 12/30/2017]
[technical5, GOOG, 12/30/2017; technical6, GOOG, 12/31/2017]
[technical5, MSFT, 12/30/2017; technical6, MSFT, 12/31/2017]
[technical6, GOOG, 12/31/2017; technical7, GOOG, 01/01/2018]
[technical6, MSFT, 12/31/2017; technical7, MSFT, 01/01/2018]
[technical7, GOOG, 01/01/2018; technical8, GOOG, 01/02/2018]
[technical7, MSFT, 01/01/2018; technical8, MSFT, 01/02/2018]
[technical8, GOOG, 01/02/2018; technical9, GOOG, 01/03/2018]
[technical8, MSFT, 01/02/2018; technical9, MSFT, 01/03/2018]
[. . .]
[technicalN, GOOG, 04/01/2018; technicalN+1, GOOG, 04/02/2018]
[technicalN, MSFT, 04/01/2018; technicalN+1, MSFT, 04/02/2018]
. . .

这会很好,但这是有问题的,因为股票市场交易日期不是连续的......换句话说,如果我正确理解 Flink 的机制(我可能是错的),使用事件时间滑动窗口的问题就像这个:

DataStream<T> input = ...;

// sliding event-time windows
input
.keyBy((TechnicalDataEntry technical) -> technical.ticker)
.window(SlidingEventTimeWindows.of(Time.day(2), Time.day(1))) // Window size of 2 days, sliding interval of 1 day
.<windowed transformation>(<window function>);

这样的数据是日期值不连续(意味着它们遵循包含一个或多个缺失天的不连续性的离散序列),因为没有股票市场关闭日期的股票市场数据,例如节假日或周末。因此,考虑到这一点,我们的流实际上最终看起来更像这样(因为交易在 2017 年 12 月 30 日、2017 年 12 月 31 日和 2018 年 1 月 1 日关闭):

[technical1, GOOG, 12/26/2017; technical2, GOOG, 12/27/2017]
[technical1, MSFT, 12/26/2017; technical2, MSFT, 12/27/2017]
[technical2, GOOG, 12/27/2017; technical3, GOOG, 12/28/2017]
[technical2, MSFT, 12/27/2017; technical3, MSFT, 12/28/2017]
[technical3, GOOG, 12/28/2017; technical4, GOOG, 12/29/2017]
[technical3, MSFT, 12/28/2017; technical4, MSFT, 12/29/2017]
[technical4, GOOG, 12/29/2017; NULL]
[technical4, MSFT, 12/29/2017; NULL]
[NULL; NULL]
[NULL; NULL]
[NULL; NULL]
[NULL; NULL]
[NULL; technical8, GOOG, 01/02/2018]
[NULL; technical8, MSFT, 01/02/2018]
[technical8, GOOG, 01/02/2018; technical9, GOOG, 01/03/2018]
[technical8, MSFT, 01/02/2018; technical9, MSFT, 01/03/2018]
[. . .]
[technicalN, GOOG, 04/01/2018; technicalN+1, GOOG, 04/02/2018]
[technicalN, MSFT, 04/01/2018; technicalN+1, MSFT, 04/02/2018]

如何让我的 Flink 流忽略丢失的日期(而是将连续的非丢失日期窗口或连接或映射在一起),以便我的流看起来像这样:

[technical1, GOOG, 12/26/2017; technical2, GOOG, 12/27/2017]
[technical1, MSFT, 12/26/2017; technical2, MSFT, 12/27/2017]
[technical2, GOOG, 12/27/2017; technical3, GOOG, 12/28/2017]
[technical2, MSFT, 12/27/2017; technical3, MSFT, 12/28/2017]
[technical3, GOOG, 12/28/2017; technical4, GOOG, 12/29/2017]
[technical3, MSFT, 12/28/2017; technical4, MSFT, 12/29/2017]
[technical4, GOOG, 12/29/2017; technical5, GOOG, 01/02/2018]
[technical4, MSFT, 12/29/2017; technical5, MSFT, 01/02/2018]
[technical5, GOOG, 01/02/2018; technical6, GOOG, 01/03/2018]
[technical5, MSFT, 01/02/2018; technical6, MSFT, 01/03/2018]
[. . .]
[technicalN, GOOG, 04/01/2018; technicalN+1, GOOG, 04/02/2018]
[technicalN, MSFT, 04/01/2018; technicalN+1, MSFT, 04/02/2018]

?

(注意:请忽略我通过字符串“技术”(如技术 1、技术 2 等)增加数字的方式,因为正如我已经提到的那样,该值仅用于本文中的描述目的,而不是实际存在于数据中。确定两个交易条目是否连续的唯一方法是按代码对它们进行分组并按交易日期对其进行排序。假设不存在重复的事件。)

标签: javaapache-flinkflink-streamingflink-cepstream-processing

解决方案


如果我理解正确,您的问题是因为在某些特定时期您没有收到事件,那么窗口将无法正常运行,因为它们不知道时间的流逝。

您可以选择定期发出水印,如下所示:

streamEnvironment.addSource(new SourceFunction<Object>() {
        @Override
        public void run(final SourceContext<Object> ctx) {
            (...)

            ctx.emitWatermark(new Watermark(timestamp));
        }

        @Override
        public void cancel() {

        }
    })

请记住,如果您在水印之前收到事件,它们将被忽略,因此您的水印发射的周期性是“窗口准确性”(尽快触发)和容忍迟到事件之间的权衡。


推荐阅读