apache-flink - 如何阻止高负载导致级联 Flink 检查点故障
问题描述
有几点我会预先自愿:
- 我是 Flink 的新手(现在已经使用了大约一个月)
- 我正在使用 Kinesis Analytics(AWS 托管的 Flink 解决方案)。无论如何,这并没有真正限制 Flink 的多功能性或容错选项,但我还是会说出来。
我们有一个相当直接的滑动窗口应用程序。键控流通过特定键(例如 IP 地址)组织事件,然后在 ProcessorFunction 中处理它们。我们主要使用它来跟踪事物的数量。例如,过去 24 小时内特定 IP 地址的登录次数。每 30 秒,我们计算窗口中每个键的事件,并将该值保存到外部数据存储中。状态也会更新以反映该窗口中的事件,以便旧事件过期并且不占用内存。
有趣的是,基数不是问题。如果我们有 20 万人登录,在 24 小时内,一切都是完美的。当一个 IP 在 24 小时内登录 20 万次时,事情开始变得棘手。此时,检查点开始花费越来越长的时间。一个平均检查点需要 2-3 秒,但根据这种用户行为,检查点开始需要 5 分钟,然后是 10 分钟,然后是 15 分钟,然后是 30 分钟,然后是 40 分钟,等等。
令人惊讶的是,应用程序可以在这种情况下平稳运行一段时间。也许 10 或 12 个小时。但是,迟早检查点会完全失败,然后我们的最大迭代器年龄开始飙升,并且没有新的事件被处理等等。
在这一点上,我尝试了一些事情:
- 在问题上扔更多的金属(自动缩放也打开了)
- 大惊小怪 CheckpointingInterval 和 MinimumPauseBetweenCheckpoints https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CheckpointConfiguration.html
- 重构以减少我们存储的状态的足迹
(1) 并没有真正做太多。(2) 这似乎有所帮助,但随后又一次比我们之前看到的更大的流量高峰消除了任何好处 (3) 目前尚不清楚这是否有帮助。我认为我们的应用程序内存占用与你想象的 Yelp 或 Airbnb 相比相当小,它们都使用 Flink 集群来处理大型应用程序,所以我无法想象我的状态真的有问题。
我会说我希望我们不必深刻改变对应用程序输出的期望。这个滑动窗口是一个非常有价值的数据。
编辑:有人问我的状态是什么样的 ValueState[FooState]
case class FooState(
entityType: String,
entityID: String,
events: List[BarStateEvent],
tableName: String,
baseFeatureName: String,
)
case class BarStateEvent(target: Double, eventID: String, timestamp: Long)
编辑:我想强调用户大卫安德森在评论中所说的话:
有时用于实现滑动窗口的一种方法是使用 MapState,其中键是切片的时间戳,值是事件列表。
这是必不可少的。对于其他试图走这条路的人,我找不到一个可行的解决方案,它不会将事件存储在某个时间片中。我的最终解决方案是将事件分成 30 秒的批次,然后按照 David 的建议将它们写入地图状态。这似乎可以解决问题。对于我们的高负载期,检查点保持在 3mb 并且它们总是在一秒钟内完成。
解决方案
如果您有一个 24 小时长的滑动窗口,并且它滑动 30 秒,那么每次登录都会分配给 2880 个单独的窗口中的每一个。没错,Flink 的滑动窗口是复制的。在本例中为 24 * 60 * 2 份。
如果您只是计算登录事件,则无需实际缓冲登录事件,直到窗口关闭。您可以改为使用 aReduceFunction
来执行增量聚合。
我的猜测是您没有利用此优化,因此当您有一个热键(IP 地址)时,处理该热键的实例具有不成比例的数据量,并且需要很长时间才能检查点。
另一方面,如果您已经在进行增量聚合,并且检查点与您描述的一样有问题,那么值得更深入地了解原因。
一种可能的补救方法是使用ProcessFunction
. 通过这样做,您可以避免维护 2880 个单独的窗口,并使用更有效的数据结构。
编辑(基于更新的问题):
我认为问题在于:使用 RocksDB 状态后端时,状态以序列化字节的形式存在。每个状态访问和更新都必须经过 ser/de。这意味着您List[BarStateEvent]
正在反序列化,然后每次修改它时重新序列化。对于列表中包含 200k 事件的 IP 地址,这将是非常昂贵的。
你应该做的是使用ListState
or 或MapState
. 这些状态类型针对 RocksDB 进行了优化。RocksDB 状态后端可以在ListState
不反序列化列表的情况下追加。并且MapState
,map 中的每个键/值对都是一个单独的 RocksDB 对象,允许高效的查找和修改。
有时用于实现滑动窗口的一种方法是使用 MapState,其中键是切片的时间戳,值是事件列表。在Flink docs中有一个做类似事情的例子(但有翻滚的窗口)。
或者,如果您的状态可以放入内存,您可以使用 FsStateBackend。然后你的所有状态都将是 JVM 堆上的对象,而 ser/de 只会在检查点和恢复期间发挥作用。
推荐阅读
- scheme - 如何在 Scheme 中找到乘法持久性
- python - 为什么在实现字符串值时处理数据会变得很慢?
- machine-learning - 运行时错误:无法打开 ./dlib_model/shape_predictor_68_face_landmarks.dat
- c# - 为什么 Rectangle.Fill 多重绑定不起作用?
- spring-boot - 配置 CORS
- matlab - 在循环中散射时在轴之间切换
- node.js - 更新请求多对多 NestJS
- c# - 为什么缩放变化非常缓慢?
- c - 打印 void* 类型的值
- ruby-on-rails - 在 MacOS 上安装 Rails 时遇到问题