首页 > 解决方案 > Kafka Streams PAPI:AbstractProcessor 对象创建行为

问题描述

我有一个包含 AbstractProcessor(实际上是两个)的 Kafka Streams 拓扑。在其中一个中,我使用Punctuation APIwithWALL_CLOCK_TIME来安排刷新处理所需的一些参考数据。我在任务开始时执行此操作,然后每隔一段时间安排一次(比如说 1 小时)。 num.stream.threads配置为 2。

例如,我有一些这样的代码:

def loadReferenceData() = {
      logger.info("Loading All Reference Data...")
      // atomically (re)load some data
}

override def init(context: ProcessorContext) = {
      super.init(context)
      logger.info("Loading reference data initially...")
      loadReferenceData()

      context.schedule(1000 * reloadDataSeconds, PunctuationType.WALL_CLOCK_TIME, (timestamp) => {
        loadReferenceData()
        context.commit(); // Unsure if necessary
      });
}

在没有传入记录的情况下,在运行应用程序的单个实例的日志中,我可以看到这些 init 日志:

    [2019-06-11 08:54:19,518] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
    [2019-06-11 08:53:31,080] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
    [2019-06-11 08:53:29,713] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
    [2019-06-11 08:53:29,682] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
    [2019-06-11 07:54:20,855] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
    [2019-06-11 07:54:19,714] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
    [2019-06-11 07:54:19,516] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
    [2019-06-11 07:53:31,036] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
    [2019-06-11 07:53:29,668] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
    [2019-06-11 07:53:29,653] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
    [2019-06-11 06:54:20,845] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
    [2019-06-11 06:54:19,726] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)

因此,似乎loadReferenceData每小时都有许多日志可供输入。我预计每小时只能看到 2 个条目(2 个线程),但还有更多(通常是 6 个)。

在日志中,我只看到在应用程序创建开始时创建了 6 次处理器:

[2019-06-10 16:54:19,849] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-10 16:54:18,231] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-10 16:54:17,874] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-10 16:53:29,675] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-10 16:53:27,132] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-10 16:53:24,923] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)

所以这是有道理的:处理器创建一次,每小时更新一次。

但是当我在我的应用程序中添加更多负载时,我可以经常看到新处理器对象的创建。

标签: scalaapache-kafkaapache-kafka-streams

解决方案


Kafka-Streams 将为输入主题的每个分区创建一个处理器,每个处理器都有自己的时间表。(如果您使用状态存储,这实际上非常有用,因为状态也会被分区。)

如果您想对内部状态应用常规操作,调度程序很有用。它很好地停止了常规处理,并确保您在计划操作期间所做的一切都是一致的。如果手头的任务与流本身无关,那么单独的线程可能同样好。

如果您选择单独的线程,请确保在kafka-streams 线程崩溃时适当地终止它。否则,您的应用程序将挂在计时器线程上,但不会消耗任何 kafka 消息。

增加线程数(num.stream.threads)意味着将同时消耗多个分区。它与启动多个实例的行为相同。请参阅https://docs.confluent.io/current/streams/architecture.html#threading-model


推荐阅读