首页 > 解决方案 > Flink 广播状态

问题描述

class MyBroadcastProcessFunction(name: String) extends BroadcastProcessFunction[Log, TimestampList, Log] with CheckpointedFunction {
    
      private var sortedTimestamps: IndexedSeq[Long] = _
      @transient var buffer: ListState[(Log, Long)] = _
    
      def processElement(value: Log, timestamp: Long, out: Collector[Log]): Unit = {
        if (timestamp > sortedTimestamps(0))
          out.collect(value)
        else
          buffer.put(value, 0L)
      }
    
    
      override def processBroadcastElement(timestamps: TimestampList,
                                           ctx: BroadcastProcessFunction[Log, TimestampList, Log]#Context,
                                           out: Collector[Log]): Unit = {
        // add timestamps to SortedTimestamps
        // do I need to use BroadcastState mapstate here? or just use operator-local data structures (ex. sortedTimestamps)?
    }
    
    override def snapshotState(context: FunctionSnapshotContext): Unit = {}
    
    override def initializeState(context: FunctionInitializationContext): Unit = {
        val stateDesc = new ListStateDescriptor[(Log, Long)]("logBuffer",
          classOf[(Log, Long)])
        buffer = context.getOperatorStateStore.getListState(stateDesc) 
    }
}

标签: scalaapache-flink

解决方案


无论流的并行性如何,转换.broadcast()都会将所有事件发送给所有下游操作员。医生说

设置 DataStream 的分区,以便将输出元素广播到下一个操作的每个并行实例。回报:

.broadcast(stateDesc)是定义一种模式状态,您可以在其中基于通常非常小的另一个流在一个事件流上找到模式。这也是一个很好的参考

您创建的方式BroadcastProcessFunction是错误的,因为您只处理一个流。处理广播状态的正确方法,在你的情况下是来自 MySql 的时间戳,在processBroadcastElement()方法中。在这种方法中,您必须更新全局/广播状态。

然后另一种方法processElement()您会收到一个常规或快速流,您可以在其中找到基于您在第一种方法上更新的状态的模式processBroadcastElement()

以下是您应该如何实施的更多信息。有一些注意事项,例如您将无法更新ListState. 最好使用MapState链接中描述的 a 。

     def processBroadcastElement(timestamps: TimestampList,
                                           ctx: BroadcastProcessFunction[Log, TimestampList, Log]#Context, out: Collector[Log]): Unit = {
           // update buffer state
           // I don't think you can use .put() to update the ListState.
           // Actually I think it is not possible to update a ListState, than you have to use MapState.
           context.getOperatorStateStore.getListState(stateDesc)
               .put(value.name, value);
      }
      def processElement(value: Log, timestamp: Long, out: Collector[Log]): Unit = {
        buffer = context.getOperatorStateStore.getListState(stateDesc)
        if (value: Log match within buffer ?)
          out.collect(value) // MATCH
      }

推荐阅读