java - RocksDB 时在算子(KeyedProcessFunction 和 RichMapFunction)之间共享状态
问题描述
根据上图,目前我需要在两个操作员之间共享状态,从一个KeyedProcessFunction
将设法处理事件并将它们从 X 类转换为 Y 类的操作员,并保持传入记录的状态以始终发送最新的类 Y 的信息到 Python 推理函数。
推理函数的结果需要映射回类 Y 并更新已经在ProcessFunction
之前创建的对象的状态Sink
。
据我所知,当 RocksDB 时,广播状态是不可能的。“没有 RocksDB 状态后端:广播状态在运行时保存在内存中,并且应该相应地进行内存配置。这适用于所有操作员状态。”
问题:
- 当我使用 RocksDB 作为状态后端时,最好的方法是什么?
KeyedProcessFunction
是否可以在 a和 a之间共享状态RichMapFunction
?
解决方案
使用 RocksDB 作为状态后端时,您可以使用广播状态。广播状态不会存储在 RocksDB 中——它会在堆上——但它会被检查点。所以广播状态需要足够小以适应内存。(此外,每个任务都会独立检查广播状态的副本。)
但是,我认为广播状态不会对这个用例有所帮助。它仅将状态广播到单个操作员的所有实例。
您不能在操作员之间共享状态。州是严格地方的。您可以将流程函数的输出流式传输到 RichMapFunction 中,以便它具有必要的信息。map 不能直接影响存储在 process 函数中的状态,但它可以拥有该状态的自己的副本。
但是,听起来您希望推理函数的输出来修改过程函数中的状态。DataStream API 不允许在数据流中出现这样的循环。但是你有几个选择:
(1) 将推理函数的结果流式传输到 kafka/kinesis 之类的东西,然后将该流作为另一个输入添加到 process 函数。(换句话说,如果您使用外部消息队列来解耦事物,则可能出现循环。当然,这会增加延迟。)
(2) 使用有状态函数API。它提供了有状态组件之间的任意通信模式(您不仅限于 DAG),还具有出色的 Python 支持等等。所有这些都在 Flink 运行时之上,因此您在一致性、恰好一次、可扩展性等方面获得相同的好处。
推荐阅读
- python - 安装 Jupyter Notebook 时遇到问题
- amazon-web-services - 在预留实例上启动实例
- python - 将 Rasberry 的两个 SPI 与两个不同的 RTD 一起使用
- node.js - 如何在 $week mongo 聚合中包含时区和 mongoose?
- php - 在数组上将 preg_replace 转换为 preg_replace_callback
- angular - 升级到 Angular 6.0 后,Angular 组件内容不显示
- javascript - 从 div 获取值到文本框
- python - 安装后无法点亮
- php - 符号代替文字,怎么改?
- python - 是否可以在 python 中使用 http.server 服务的目录中显示文件大小?