首页 > 解决方案 > Kafka 流时间和窗口过期 - KStreamSessionWindowAggregate 跳过记录

问题描述

我是 Kafka-stream 的新手,我正在试验它来处理大量消息。

设想

传入的有效载荷结构是:

"building-<M>, sensor-<N>.<parameter>, value, timestamp". 

例如:

"building-1, sensor-1.temperature, 18, 2020-06-12T15:01:05Z"
"building-1, sensor-1.humidity, 75, 2020-06-12T15:01:05Z"
"building-1, sensor-2.temperature, 20, 2020-06-12T15:01:05Z"
"building-1, sensor-2.humidity, 70, 2020-06-12T15:01:05Z"

kafka 中的消息键是 building-id。

Stream 将其转换为 POJO 以进行进一步的下游处理:

SensorData {
  buildingId = "building-1"
  sensorId = "sensor-1"
  parameterName = "temperature"
  parameterValue = 18
  timestamp = 1592048743000
  ..
  ..
}

每个传感器将同时发送其所有参数作为单独的记录。每组饲料每 5 分钟从每个传感器发出一次。

时间戳提取器设置为从有效负载中获取时间。如果记录上的时间戳偏离(例如与当前流时间相差 1 小时),它也会拒绝记录

在我的拓扑中,有一次,我想执行一个聚合操作,将来自一个传感器的所有数据结合起来。例如,在上面的示例中,我想使用该传感器报告的温度和湿度为每个传感器执行聚合。

拓扑

我使用“buildingId”和“sensorId”做了一个组,然后应用 2 分钟间隔的会话窗口和 1 分钟的宽限期。

kStreamBuilder
    .stream("building-sensor-updates", ...)
    //Had to cleanup key and also needed some data from context
    .tranform(() -> new String2SensorObjectConvertor()) 
     //triggers another re-partition
    .groupBy((key, value) -> value.buildingId + "-" + value.sensorId, ...)
    .windowedBy(SessionWindows.with(..))
    .aggregate(
            () -> new SensorDataAggregator, 
            ...,
            Materialized.<String, SensorDataAggregator, 
              SessionStore<Bytes, byte[]>>as("session_aggregate_store"))
    .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
    .toStream()
    ...
    ...

正如预期的那样,这会触发重新分区,并且子流将使用来自此重新分区主题“sensor_data_processor-session_aggregate_store-repartition”的记录。正如稍后解释的那样,我在那里看到了一个问题。

测试输入数据

我正在测试一个场景,其中过去的数据再次从存储或 Kafka 偏移中重新处理。为了测试,我使用 Kafka-spool-connect 从 csv 提供数据。输入 CSV 文件中每条记录的时间戳按升序保存。对于同一个传感器,下一组记录的时间戳将增加 5 分钟。

"building-1, sensor-1.temperature, 18, 2020-06-12T15:01:02Z"
"building-1, sensor-1.humidity, 75, 2020-06-12T15:01:05Z"
"building-1, sensor-2.temperature, 20, 2020-06-12T15:01:03Z"
"building-1, sensor-2.humidity, 70, 2020-06-12T15:01:06Z"
"building-1, sensor-1.temperature, 19, 2020-06-12T15:06:04Z"
"building-1, sensor-1.humidity, 65, 2020-06-12T15:06:08Z"
"building-1, sensor-2.temperature, 21, 2020-06-12T15:06:05Z"
"building-1, sensor-2.humidity, 73, 2020-06-12T15:06:09Z"

我毫不拖延地批量注入测试数据(200000)。

问题

当子流处理来自此重新分区主题的记录时,我看到来自 KStreamSessionWindowAggregate 的以下警告消息并且记录被跳过。

警告 org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate - 跳过过期窗口的记录。key=[BUILDING-ID-1003-sensor-1] topic=[sensor_data_processor-session_aggregate_store-repartition] partition=[0] offset=[1870] timestamp=[1591872043000] window=[1591872043000,1591872043000] expire=[1591951243000] streamTime =[1591951303000]

如果您查看警告消息中的时间戳,

我尝试提前 2 分钟使用 7 分钟的时间窗口。我在那里也有类似的问题。

观察

  1. 由于原始消息的键是“building-id”,因此来自同一建筑物(以及因此相同的传感器)的所有记录应该进入一个分区,并且来自每个传感器的记录应该是有序的。

  2. 我也在拓扑的开头做一个 transform() 。我必须清理密钥并且还想要一些来自上下文的数据。尽管这可能会触发重新分区,但这不应该改变传感器中记录的顺序,因为它只清理键,因此分区结果将在分区中保持相同的元素。我将通过一些优化摆脱这个 tranform() 。

  3. 我的窗口分组基于 building-id + sensor-id,因此每个重新分区组中来自同一传感器的元素也应该按顺序排列。

鉴于这一切,我希望每个分区/组的流时间将根据该分区中事件的时间戳单调地进行,因为它们的顺序保持不变。但是我看到了流时间的跳跃。我查看了org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate和一些 kafka-stream 文档 -

在我看来,单调的流时间是为流任务而不是每个分区维护的。并且相同的流任务可以用于处理多个主题分区。因为记录是快速连续注入的,它可能会处理来自一个分区的大量记录,当它拿起另一个主题分区时,与新主题分区中记录的时间戳相比,流时间可能已经跨越了很多将导致过期。

问题

  1. 对于像这样重放记录,除了为窗口设置一个大的宽限期之外,如何处理它。

  2. 即使在实时场景中,如果存在背压,也可能会发生此问题。使用大的宽限期不是一种选择,因为我使用 Suppresed.untilWindowClose() 时结果会延迟。处理这个问题的最佳方法是什么?

  3. 如果为流任务维护流时间并且同一任务可能用于多个主题分区,那么我们是否可以在流任务和主题分区之间保持 1-1 映射和粘性?如果是这样,除了潜在的性能问题之外还有什么影响?

  4. 为什么 kafka-stream 不为主题分区而不是每个流任务维护流时间?

  5. 当我查看警告消息中提到的“sensor_data_processor-session_aggregate_store-re-partition”主题时,我看到大多数“温度”记录都单独发布到该主题(是的,对于每个组,“温度”排在第一位测试数据集)。为什么只有温度记录进入该主题?只是时间巧合吗?

标签: apache-kafka-streamsapache-kafka-connect

解决方案


对于像这样重放记录,除了为窗口设置一个大的宽限期之外,如何处理它。

我想你不能。如果您处理今天的数据,然后处理昨天的数据,昨天的数据将被丢弃。你可以做什么,开始一个的应用程序。对于这种情况,应用程序在启动时没有流时间,因此它将以“昨天”初始化其流时间,因此不会丢弃数据。

即使在实时场景中,如果存在背压,也可能会发生此问题。使用大的宽限期不是一种选择,因为我使用 Suppresed.untilWindowClose() 时结果会延迟。处理这个问题的最佳方法是什么?

好吧,你必须选择你的毒药......或者你回到处理器 API 并手动实现你需要的任何逻辑。

如果为流任务维护流时间并且同一任务可能用于多个主题分区,那么我们是否可以在流任务和主题分区之间保持 1-1 映射和粘性?如果是这样,除了潜在的性能问题之外还有什么影响?

每个任务肯定会维护流时间,并且任务和分区之间存在 1:1 映射。也许数据被意外洗牌。My window grouping is based on building-id + sensor-id, so the elements from same sensor in each re-partitioned group also should be coming in order.:同意,但是,数据仍将被洗牌;因此,如果一个上游任务处理数据的速度比它的“并行”梨快,那么如果所有下游任务也一样,它将导致流时间的快速推进。

为什么 kafka-stream 不为主题分区而不是每个流任务维护流时间?

不确定我是否可以跟随。每个任务单独跟踪流时间。任务和分区之间存在 1:1 的映射关系。因此,似乎两者(跟踪每个分区或跟踪每个任务——假设每个任务只有一个输入分区)是相同的。


推荐阅读