首页 > 解决方案 > 当输入/输出主题不存在时,kafka-streams 拓扑处于什么状态?

问题描述

我必须维护几个集成测试,包括一个在输入和输出主题以前不存在时检查 kafka-streams 拓扑行为的测试。

测试基本是这样的

@Test
fun `starting of the topology should fail even when the input and output topics are not created`() {
    // topics are not created here.

    val streamTopology = CustomStreamTopology(
        inputTopic,
        retryTopic,
        outputTopic,
        ... //couple of custom services
    )

    streamTopology.use {
        it.start()

        (await withPollInterval ONE_HUNDRED_MILLISECONDS).until { streamTopology.state == KafkaStreams.State.ERROR }
        assertFalse(streamTopology.isAlive())
    }
}

因此,它等待拓扑处于 ERROR 状态,然后断言流拓扑是否真正处于活动状态。

该测试在 kafka-streams v2.5.1 上运行良好。但由于一些漏洞,我不得不升级版本。

将其升级到 2.7.1 后,它会因 PENDING_SHUTDOWN 而超时。在将其进一步升级到 2.8.0 之后,它会一直处于 NOT RUNNING 状态,直到超时。

所以在我看来,处理不存在主题的方式在每个次要版本中都发生了变化。这是真的?

我找不到合适的 kafka-streams 拓扑状态图来详细解释每个状态。我能找到的只有这个。但这并不能告诉我太多关于 PENDING_SHUTDOWN、ERROR 和 NOT_RUNNING 状态的信息。

我的问题是,如果主题在拓扑创建之前不存在,我可以期望拓扑处于什么状态?

标签: kotlinapache-kafkaapache-kafka-streams

解决方案


推荐阅读