首页 > 解决方案 > Flink - 如何跨多个任务槽聚合和查询丰富的接收器函数状态

问题描述

我实现了一个丰富的接收器函数,它根据调用的对象执行一些网络调用。我希望能够计算这些事件的元数据,这些元数据由事件中包含的一些上下文信息(事件的批处理 ID)作为键控,并将这些元数据公开给外部系统。

例如,一个事件如下所示:

case class MyEvent(batchId: String, eventId: String, moreInformation: ...)

class MySink(...) extends RichSinkFunction[MyEvent] 
{
override def open(parameters: Configuration): Unit = {
    ...
  }

  override def close(): Unit = {
    ...
  }

  override def invoke(event: MyEvent) = {
    // some processing is done here

    ....
   //
   ...
     if (success) {
        I want to save the meta data here per event.batchId
        state.count.number.of.events.processed.for.event.batchId
     }
  }
}

在另一个地方,我希望能够以某种方式查询为 batchId 处理了多少事件的值

标签: scalaapache-kafkaapache-flinkkafka-consumer-apiflink-streaming

解决方案


几个选项:

计划 A:使用 Metric 对象和 MetricReporter 将数据公开给外部系统。这样做的缺点是指标没有检查点,如果有很多 batchId,您最终可能会用大量无法被 GC 处理的指标污染指标系统。

计划 B:将您的 RichSinkFunction 重写为 RichFlatMap(或 ProcessFunction),它发出持有 (batchId, number.of.events.in.batchId) 的元组流。您可以通过 batchId 键入此流,然后使用 KeyedProcessFunction 中的键入状态(例如)通过可查询状态存储和公开此状态。这样做的缺点是可查询状态只允许点查询(一次一个键)。

计划 C:在此变体中,外部系统可以查询在计划 B 中创建的状态,方法是将查询注入到广播到 KeyedBroadcastProcessFunction 的流中,该 KeyedBroadcastProcessFunction 包含键控 state.count.number.of.events.processed.for.event.batchId数据。然后您可以在 KeyedBroadcastProcessFunction 的 processBroadcastElement 方法中使用 ctx.applyToKeyedState 来响应这些查询。有关示例,请参阅其中一个 Flink 培训练习。

计划 D:将 B(或 C)的结果写入 redis、elasticsearch 或其他一些可查询的数据存储,并让外部系统从那里获取此信息。


推荐阅读