首页 > 解决方案 > 如何阻止高负载导致级联 Flink 检查点故障

问题描述

有几点我会预先自愿:

  1. 我是 Flink 的新手(现在已经使用了大约一个月)
  2. 我正在使用 Kinesis Analytics(AWS 托管的 Flink 解决方案)。无论如何,这并没有真正限制 Flink 的多功能性或容错选项,但我还是会说出来。

我们有一个相当直接的滑动窗口应用程序。键控流通过特定键(例如 IP 地址)组织事件,然后在 ProcessorFunction 中处理它们。我们主要使用它来跟踪事物的数量。例如,过去 24 小时内特定 IP 地址的登录次数。每 30 秒,我们计算窗口中每个键的事件,并将该值保存到外部数据存储中。状态也会更新以反映该窗口中的事件,以便旧事件过期并且不占用内存。

有趣的是,基数不是问题。如果我们有 20 万人登录,在 24 小时内,一切都是完美的。当一个 IP 在 24 小时内登录 20 万次时,事情开始变得棘手。此时,检查点开始花费越来越长的时间。一个平均检查点需要 2-3 秒,但根据这种用户行为,检查点开始需要 5 分钟,然后是 10 分钟,然后是 15 分钟,然后是 30 分钟,然后是 40 分钟,等等。

令人惊讶的是,应用程序可以在这种情况下平稳运行一段时间。也许 10 或 12 个小时。但是,迟早检查点会完全失败,然后我们的最大迭代器年龄开始飙升,并且没有新的事件被处理等等。

在这一点上,我尝试了一些事情:

  1. 在问题上扔更多的金属(自动缩放也打开了)
  2. 大惊小怪 CheckpointingInterval 和 MinimumPauseBetweenCheckpoints https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CheckpointConfiguration.html
  3. 重构以减少我们存储的状态的足迹

(1) 并没有真正做太多。(2) 这似乎有所帮助,但随后又一次比我们之前看到的更大的流量高峰消除了任何好处 (3) 目前尚不清楚这是否有帮助。我认为我们的应用程序内存占用与你想象的 Yelp 或 Airbnb 相比相当小,它们都使用 Flink 集群来处理大型应用程序,所以我无法想象我的状态真的有问题。

我会说我希望我们不必深刻改变对应用程序输出的期望。这个滑动窗口是一个非常有价值的数据。

编辑:有人问我的状态是什么样的 ValueState[FooState]

case class FooState(
                         entityType: String,
                         entityID: String,
                         events: List[BarStateEvent],
                         tableName: String,
                         baseFeatureName: String,
                       )

case class BarStateEvent(target: Double, eventID: String, timestamp: Long)

编辑:我想强调用户大卫安德森在评论中所说的话:

有时用于实现滑动窗口的一种方法是使用 MapState,其中键是切片的时间戳,值是事件列表。

这是必不可少的。对于其他试图走这条路的人,我找不到一个可行的解决方案,它不会将事件存储在某个时间片中。我的最终解决方案是将事件分成 30 秒的批次,然后按照 David 的建议将它们写入地图状态。这似乎可以解决问题。对于我们的高负载期,检查点保持在 3mb 并且它们总是在一秒钟内完成。

标签: apache-flinkamazon-kinesisamazon-kinesis-analytics

解决方案


如果您有一个 24 小时长的滑动窗口,并且它滑动 30 秒,那么每次登录都会分配给 2880 个单独的窗口中的每一个。没错,Flink 的滑动窗口是复制的。在本例中为 24 * 60 * 2 份。

如果您只是计算登录事件,则无需实际缓冲登录事件,直到窗口关闭。您可以改为使用 aReduceFunction来执行增量聚合

我的猜测是您没有利用此优化,因此当您有一个热键(IP 地址)时,处理该热键的实例具有不成比例的数据量,并且需要很长时间才能检查点。

另一方面,如果您已经在进行增量聚合,并且检查点与您描述的一样有问题,那么值得更深入地了解原因。

一种可能的补救方法是使用ProcessFunction. 通过这样做,您可以避免维护 2880 个单独的窗口,并使用更有效的数据结构。

编辑(基于更新的问题):

我认为问题在于:使用 RocksDB 状态后端时,状态以序列化字节的形式存在。每个状态访问和更新都必须经过 ser/de。这意味着您List[BarStateEvent]正在反序列化,然后每次修改它时重新序列化。对于列表中包含 200k 事件的 IP 地址,这将是非常昂贵的。

你应该做的是使用ListStateor 或MapState. 这些状态类型针对 RocksDB 进行了优化。RocksDB 状态后端可以在ListState不反序列化列表的情况下追加。并且MapState,map 中的每个键/值对都是一个单独的 RocksDB 对象,允许高效的查找和修改。

有时用于实现滑动窗口的一种方法是使用 MapState,其中键是切片的时间戳,值是事件列表。在Flink docs中有一个做类似事情的例子(但有翻滚的窗口)。

或者,如果您的状态可以放入内存,您可以使用 FsStateBackend。然后你的所有状态都将是 JVM 堆上的对象,而 ser/de 只会在检查点和恢复期间发挥作用。


推荐阅读