scala - Apache flink 广播状态被刷新
问题描述
我正在使用广播模式连接两个流并从一个流读取数据。代码看起来像这样
case class Broadcast extends BroadCastProcessFunction[MyObject,(String,Double), MyObject]{
override def processBroadcastElement(in2: (String, Double),
context: BroadcastProcessFunction[MyObject, (String, Double), MyObject]#Context,
collector:Collector[MyObject]):Unit={
context.getBroadcastState(broadcastStateDescriptor).put(in2._1,in2._2)
}
override def processElement(obj: MyObject,
readOnlyContext:BroadCastProcessFunction[MyObject, (String,Double),
MyObject]#ReadOnlyContext, collector: Collector[MyObject]):Unit={
val theValue = readOnlyContext.getBroadccastState(broadcastStateDesriptor).get(obj.prop)
//If I print the context of the state here sometimes it is empty.
out.collect(MyObject(new, properties, go, here))
}
}
状态描述符:
val broadcastStateDescriptor: MapStateDescriptor[String, Double) = new MapStateDescriptor[String, Double]("name_for_this", classOf[String], classOf[Double])
我的执行代码如下所示。
val streamA :DataStream[MyObject] = ...
val streamB :DataStream[(String,Double)] = ...
val broadcastedStream = streamB.broadcast(broadcastStateDescriptor)
streamA.connect(streamB).process(new Broadcast)
问题出在processElement
函数中,状态有时为空,有时不是。状态应该始终包含数据,因为我不断地从一个我知道它有数据的文件中流式传输。我不明白为什么它正在刷新状态并且我无法获取数据。
processBroadcastElement
我尝试在将数据置于状态之前和之后添加一些打印,结果如下
0 - 1
1 - 2
2 - 3
.. all the way to 48 where it resets back to 0
更新:我注意到的是,当我减少流执行上下文的超时值时,结果会好一些。当我增加它时,地图总是空的。
env.setBufferTimeout(1) //better results
env.setBufferTimeout(200) //worse result (default is 100)
解决方案
每当在 Flink 中连接两个流时,您无法控制 Flink 将事件从两个流传递到您的用户函数的时间。因此,例如,如果存在可从流 A 处理的事件,以及可从流 B 处理的事件,则接下来可能会处理其中一个事件。您不能期望 broadcastedStream 以某种方式优先于其他流。
根据您的要求,您可以采用各种策略来应对两个流之间的竞争。例如,您可以使用 KeyedBroadcastProcessFunction 并使用其 applyToKeyedState 方法在新的广播事件到达时迭代所有现有的键控状态。
推荐阅读
- java - 在 Java 中连接到 sqlite 数据库的问题
- gambas - 类容器-gambas 错误中的未知符号“textbox1”
- java - 是否有非异常方式访问 getDeclaredMethod?
- python - Python 默认字典参考
- android - Glide:调整 ImageView 的大小以适应滑动中的图像,而不是相反
- amazon-web-services - 如何从 SNS 订阅中提取令牌并在确认订阅方法中使用它?
- wordpress - 自定义分类帖子名称未检索
- hyperledger-fabric - Hyperledger Fabric:/etc/hyperledger/fabric-ca-server/ca-chain.pem 中不存在链文件
- audio - 使用什么 ffmpeg 命令将无符号整数列表转换为音频文件?
- java - 如何在 Java 中使用 ComboBox 数据填充 ArrayList