首页 > 解决方案 > Apache BEAM 管道 IllegalArgumentException - 时间戳倾斜?

问题描述

我有一个现有的 BEAM 管道,它正在处理通过 2 条路由摄取的数据(来自 Google Pubsub 主题)。“热”路径执行一些基本转换并将它们存储在数据存储中,而“冷”路径执行固定的每小时窗口,以便在存储之前进行更深入的分析。

到目前为止,管道一直运行良好,直到我开始在发布到 Pubsub 之前对数据进行一些本地缓冲(因此数据到达 Pubsub 可能会“晚”几个小时)。抛出的错误如下:

java.lang.IllegalArgumentException: Cannot output with timestamp 2018-06-19T14:00:56.862Z. Output timestamps must be no earlier than the timestamp of the current input (2018-06-19T14:01:01.862Z) minus the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.
    at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:463)
    at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:429)
    at org.apache.beam.sdk.transforms.WithTimestamps$AddTimestampsDoFn.processElement(WithTimestamps.java:138)

它似乎引用了执行每小时窗口的我的代码部分(withTimestamps 方法),如下所示:

Window<KV<String, Data>> window = Window.<KV<String, Data>>into
                (FixedWindows.of(Duration.standardHours(1)))
    .triggering(Repeatedly.forever(pastEndOfWindow()))
    .withAllowedLateness(Duration.standardSeconds(10))
    .discardingFiredPanes();

PCollection<KV<String, List<Data>>> keyToDataList = eData.apply("Add Event Timestamp", WithTimestamps.of(new EventTimestampFunction()))
    .apply("Windowing", window)
    .apply("Group by Key", GroupByKey.create())
    .apply("Sort by date", ParDo.of(new SortDataFn()));

我不确定我是否完全理解我在这里做错了什么。是因为数据迟到导致错误吗?据我了解,如果数据在允许的延迟之后到达,则应该将其丢弃,而不是像我所看到的那样抛出错误。

想知道设置无限制的 timestampSkew 是否可以解决这个问题?迟到的数据可以免于分析,我只需要确保不会抛出会阻塞管道的错误。我也没有其他地方可以添加/更改数据的时间戳,所以我不确定为什么会抛出错误。

标签: google-cloud-dataflowapache-beamfluentd

解决方案



推荐阅读