首页 > 解决方案 > 可能快照机制在 Apache Flink 中消耗越来越多的内存

问题描述

我正在学习快照机制在 Flink 中的工作原理。

据我了解,JobManager 会以固定的间隔将障碍插入每个数据源,每个操作员一旦从所有数据源接收到第 n 个障碍,就会进行快照。

如果我是对的,似乎这种机制在某些情况下可能会使用越来越多的内存。

这是一个例子:

说有两个数据源:Source 1Source 2,和一个运算符。

Source 1 -----\
               ------ Operator
Source 2 -----/

Source 1正在生成整数流:1、2、3、4、5...

Source 2正在生成字符流:a, b, c, d, e...

操作员这样做:它需要两个输入Source 1和一个输入Source 2来生成输出:1a2、3b4、5c6、7d8...

假设 JobManager 将屏障插入到两个数据源,如下所示:

1, BARRIER A, 2, BARRIER B, 3, BARRIER C, 4, BARRIER D, 5...
a, BARRIER A, b, BARRIER B, c, BARRIER C, d, BARRIER D, 5...

现在让我们开始吧。

当 和 的两个“BARRIER A”Source 1进入Source 2Operator 时,Flink 会对当前状态为1and的 Operator 进行快照a,因为BARRIER A 进入 Operator 时1和已经在 Operator 中。a

然后,当两个“BARRIER B”进入 Operator 时,Operator 已经完成了它的第一个任务: generate 1a2,Flink 会再做一个快照:NA, bNA表示当前没有来自 的新输入Source 1

同时,每个快照都会存储到 RAM、FS 或 RocksDB 中(取决于我们如何配置 Flink)。

如果我是对的,我认为 Flink 在这个例子中会生成越来越多的快照。因为 的消费速度Source 1永远是 的两倍Source 2

我是不是误会了什么?

标签: apache-flinkflink-streamingsnapshotfault-tolerance

解决方案


有趣的思想实验。

如果您限制自己仅使用 Flink API 的标准部分,则无法实现一个用户函数,该函数将为从 Source 2 读取的每个输入从 Source 1 读取两个输入。CoProcessFunction例如,在实现 a 时,您处于Flink 运行时的怜悯,它将根据自己的内部逻辑从任一流中提供事件。这两个流将相互竞争,可能在不同的线程甚至不同的进程中运行。当流汇聚时,如果来自两个输入的事件没有按照您希望的顺序提供,您必须在 Flink 状态下缓冲它们,直到您准备好处理它们。

这可能导致大量缓冲需求的常见情况是在实现事件时间连接时,其中一个流在时间戳方面远远领先于其他流(例如,加入外汇汇率的金融交易,使用汇率如果汇率流滞后,则在交易时生效)。但是这种缓冲可以在 RocksDB 中完成,并且不必对内存施加压力。

请注意,这种状态缓冲完全发生在您的应用程序中——Flink 没有灵活的网络缓冲区,可以在背压期间膨胀。

另一点是快照永远不会存储在本地文件系统或 RocksDB 中。如果您选择使用 RocksDB 状态后端,则每个任务管理器的活动、工作状态将存储在本地 RocksDB 实例中,但状态备份(快照)将存储在分布式文件系统中。

至于你这样描述的情况,

1, BARRIER A, 2, BARRIER B, 3, BARRIER C, 4, BARRIER D, 5...
a, BARRIER A, b, BARRIER B, c, BARRIER C, d, BARRIER D, 5...

这不会发生。没有什么可以安排这两个源以这种方式同步——它们将比这张图显示的更独立地进行。因为 Flink 在流水线阶段之间只有少量、固定数量的网络缓冲,所以执行图中发生的任何背压都会迅速传播回一个或两个源。当这种情况发生时,背压的源将无法将任何事件推送到管道中,直到背压缓解——但与此同时,另一个源可能会继续取得进展。屏障将大致同时由两个源独立插入到两个流中,但如果源 2 正在经历频繁的背压(例如),它可能看起来更像这样:

1, BARRIER, A, 2, B, 3, BARRIER, C, 4, D, BARRIER, 5 ...
a, BARRIER, A, BARRIER, b, B, BARRIER, BARRIER, c ...

推荐阅读