首页 > 解决方案 > RocksDB 时在算子(KeyedProcessFunction 和 RichMapFunction)之间共享状态

问题描述

在此处输入图像描述

根据上图,目前我需要在两个操作员之间共享状态,从一个KeyedProcessFunction将设法处理事件并将它们从 X 类转换为 Y 类的操作员,并保持传入记录的状态以始终发送最新的类 Y 的信息到 Python 推理函数。

推理函数的结果需要映射回类 Y 并更新已经在ProcessFunction之前创建的对象的状态Sink

据我所知,当 RocksDB 时,广播状态是不可能的。“没有 RocksDB 状态后端:广播状态在运行时保存在内存中,并且应该相应地进行内存配置。这适用于所有操作员状态。”

问题:

  1. 当我使用 RocksDB 作为状态后端时,最好的方法是什么?
  2. KeyedProcessFunction是否可以在 a和 a之间共享状态RichMapFunction

标签: javaapache-flinkflink-streamingflink-cep

解决方案


使用 RocksDB 作为状态后端时,您可以使用广播状态。广播状态不会存储在 RocksDB 中——它会在堆上——但它会被检查点。所以广播状态需要足够小以适应内存。(此外,每个任务都会独立检查广播状态的副本。)

但是,我认为广播状态不会对这个用例有所帮助。它仅将状态广播到单个操作员的所有实例。

您不能在操作员之间共享状态。州是严格地方的。您可以将流程函数的输出流式传输到 RichMapFunction 中,以便它具有必要的信息。map 不能直接影响存储在 process 函数中的状态,但它可以拥有该状态的自己的副本。

但是,听起来您希望推理函数的输出来修改过程函数中的状态。DataStream API 不允许在数据流中出现这样的循环。但是你有几个选择:

(1) 将推理函数的结果流式传输到 kafka/kinesis 之类的东西,然后将该流作为另一个输入添加到 process 函数。(换句话说,如果您使用外部消息队列来解耦事物,则可能出现循环。当然,这会增加延迟。)

(2) 使用有状态函数API。它提供了有状态组件之间的任意通信模式(您不仅限于 DAG),还具有出色的 Python 支持等等。所有这些都在 Flink 运行时之上,因此您在一致性、恰好一次、可扩展性等方面获得相同的好处。


推荐阅读