scala - Flink 广播状态
问题描述
- 我有一个 RichParallelSourceFunction(parallelism=1) 每 5 秒查询一次 MySQL,它发出一个时间戳列表,指示何时开始/停止写入接收器。
- 这个时间戳被广播到原始流(并行度=10)。
- 我已将 RichParallelSourceFunction 的并行度设置为 1,以减少对 MySQL 服务器的同时请求数。
- 我很困惑在这种情况下是否需要广播状态。为什么不将广播数据存储在运营商本地数据结构中?
- .broadcast(stateDesc) 与 .broadcast() 有什么区别?
class MyBroadcastProcessFunction(name: String) extends BroadcastProcessFunction[Log, TimestampList, Log] with CheckpointedFunction {
private var sortedTimestamps: IndexedSeq[Long] = _
@transient var buffer: ListState[(Log, Long)] = _
def processElement(value: Log, timestamp: Long, out: Collector[Log]): Unit = {
if (timestamp > sortedTimestamps(0))
out.collect(value)
else
buffer.put(value, 0L)
}
override def processBroadcastElement(timestamps: TimestampList,
ctx: BroadcastProcessFunction[Log, TimestampList, Log]#Context,
out: Collector[Log]): Unit = {
// add timestamps to SortedTimestamps
// do I need to use BroadcastState mapstate here? or just use operator-local data structures (ex. sortedTimestamps)?
}
override def snapshotState(context: FunctionSnapshotContext): Unit = {}
override def initializeState(context: FunctionInitializationContext): Unit = {
val stateDesc = new ListStateDescriptor[(Log, Long)]("logBuffer",
classOf[(Log, Long)])
buffer = context.getOperatorStateStore.getListState(stateDesc)
}
}
解决方案
无论流的并行性如何,转换.broadcast()
都会将所有事件发送给所有下游操作员。医生说:
设置 DataStream 的分区,以便将输出元素广播到下一个操作的每个并行实例。回报:
这.broadcast(stateDesc)
是定义一种模式状态,您可以在其中基于通常非常小的另一个流在一个事件流上找到模式。这也是一个很好的参考。
您创建的方式BroadcastProcessFunction
是错误的,因为您只处理一个流。处理广播状态的正确方法,在你的情况下是来自 MySql 的时间戳,在processBroadcastElement()
方法中。在这种方法中,您必须更新全局/广播状态。
然后另一种方法processElement()
您会收到一个常规或快速流,您可以在其中找到基于您在第一种方法上更新的状态的模式processBroadcastElement()
。
以下是您应该如何实施的更多信息。有一些注意事项,例如您将无法更新ListState
. 最好使用MapState
链接中描述的 a 。
def processBroadcastElement(timestamps: TimestampList,
ctx: BroadcastProcessFunction[Log, TimestampList, Log]#Context, out: Collector[Log]): Unit = {
// update buffer state
// I don't think you can use .put() to update the ListState.
// Actually I think it is not possible to update a ListState, than you have to use MapState.
context.getOperatorStateStore.getListState(stateDesc)
.put(value.name, value);
}
def processElement(value: Log, timestamp: Long, out: Collector[Log]): Unit = {
buffer = context.getOperatorStateStore.getListState(stateDesc)
if (value: Log match within buffer ?)
out.collect(value) // MATCH
}
推荐阅读
- virtual-machine - 未使用 Azure 站点恢复故障转移复制 NSG 规则
- c# - 有没有办法统一更改脚本中 CharacterJoint 组件的值(例如 hightwistlimit)?
- pine-script - 在 TradingView Pine Script 上获取特定柱的日期
- jenkins - Jenkins UI - 缺少“环境变量”按钮
- swift - ld:警告:找不到或使用自动链接库“swiftCore”
- typescript - 如何在不共享 Express Requests 的情况下创建 Typescript Map
- mongodb - MongoError:此 MongoDB 部署不支持可重试写入。请将 retryWrites=false 添加到您的连接字符串
- r - 如何根据 dplyr 管道中另一列的第一行的值重命名列
- c# - RenderTargetBitmap 获取我的 UserControl 的空图像
- ios - 如何在 iOS 13 中的 Swift 中立即更改状态栏文本颜色