首页 > 解决方案 > 如何从与不同代理关联的多个 Kafka 主题中消费?

问题描述

如何从与不同代理关联的多个 Kafka 主题中消费?

我有一个 Spring Boot 应用程序需要使用 2 个主题,但这些主题与不同的代理相关联。

我正在使用带有 @Listener 注释的 Spring Kafka,我看到有一些方法可以从与同一代理相关联的 2 个主题中消费,而不是不同的代理。不幸的是,我在 Spring Boot 或 Spring Kafka 文档中没有看到有关如何执行此操作的任何有用信息。

标签: javaspringspring-bootapache-kafkaspring-kafka

解决方案


有几种方法可以做到这一点,不幸的是 Spring-Boot 和 Spring-Kafka 并没有明确实现这一点的最佳实践。SO中还有很多答案可以解决使用同一个Broker从多个主题中消费的问题,但这并不总是那么简单。

方法一

解决此问题的最简单方法是在Kafka 侦听器注释中添加属性参数:

@KafkaListener(topics = ["\${topic-1-name}"], properties = ["bootstrap.servers=\${bootstrap-server-1}"])
fun topic1Listener(@Payload messages: List<String>, ack: Acknowledgment){
    // Do work
}

@KafkaListener(topics = ["\${topic-2-name}"], properties = ["bootstrap.servers=\${bootstrap-server-2}"])
fun topic2Listener(@Payload messages: List<String>, ack: Acknowledgment){
    // Do work
}

我们在属性参数值中指定的任何键/值对都将覆盖 DefaultKafkaConsumerFactory 中的默认键/值。在这种情况下,我们将 bootstrap.servers 属性覆盖为我们自己的每个主题的特定引导服务器地址。

但是,我们仍然可以使用“不错”的 Spring-Boot 功能,例如自动创建主题和允许 Spring-Boot 为我们的应用程序设置组 ID。我们只需要将 group-id 参数保留在 application.properties 或 application.yml 文件中。

spring:
  kafka:
    consumer:
      group-id: group-id-of-your-choice
  • 请注意,我们可以为我们的两个消费者使用相同的 group-id,即使它们可能跨越多个代理。实际上,为整个应用程序设置 1 个组 ID 是一种很好的做法,这样监控消费者延迟以及其他指标就变得简单了。
  • 另请注意,我们不再将主题名称存储在 Spring 配置部分中,我们需要在其他地方执行此操作,因为我们不希望 Spring-Boot 使用不正确的代理地址配置我们的主题。当我们覆盖属性时,我们让监听器处理该部分,如上所示。

还有很多其他方法可以实现这一点,但这是我发现并测试过的最简单的方法。

方法二

其他方法包括创建您自己的自定义 ConsumerFactory 和 KafkaListenerContainerFactory 对象,然后配置每个 Factory 中的属性以使用您选择的引导服务器。但是,第一种方法更简洁,使用默认的 Container Factory。以下是如何使用您自己的属性创建自定义工厂。

@Bean
fun ConsumerFactory1(): DefaultKafkaConsumerFactory<String, String> {
        val props = mutableMapOf<String, Any>()
        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootStrapServers1!!
        props[ConsumerConfig.GROUP_ID_CONFIG] = groupId!!
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        return DefaultKafkaConsumerFactory(props)
}

@Bean
fun ContainerFactory1(): ConcurrentKafkaListenerContainerFactory<String, String>? {
        val factory: ConcurrentKafkaListenerContainerFactory<String, String> = ConcurrentKafkaListenerContainerFactory()
        factory.consumerFactory = ConsumerFactory1()
        factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
        factory.isBatchListener = true
        return factory
}

@Bean
fun ConsumerFactory2(): DefaultKafkaConsumerFactory<Any?, Any?> {
        val props = mutableMapOf<String, Any>()
        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootStrapServers2!!
        props[ConsumerConfig.GROUP_ID_CONFIG] = groupId!!
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java

}

@Bean
fun ContainerFactory2(): ConcurrentKafkaListenerContainerFactory<String, String> {
        val factory: ConcurrentKafkaListenerContainerFactory<String, String> = ConcurrentKafkaListenerContainerFactory()
        factory.consumerFactory = ConsumerFactory2()
        factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
        factory.isBatchListener = true
        return factory
}

这里有一些东西要解压。

  • 容器工厂本质上是相同的,我们只是使用它们各自的消费者工厂和每个相关的属性。
  • 我使用内置的 StringDeserializer,因为我的应用程序将消息作为字符串使用,然后使用 Jackson 将 Json 字符串序列化为对象。您的应用程序可能需要不同的反序列化器,甚至是自定义反序列化器,具体取决于主题上的数据序列化方式。
  • 将 AckMode 设置为 MANUAL 允许我们在确认已使用来自主题的消息时进行控制。
  • 将批处理侦听器设置为 true 允许我们的侦听器分批侦听消息,而不是一次侦听 1 个消息。
  • 通过这个实现,就使用 Kafka 而言,我们完全将 Spring-Boot 从我们的应用程序中剥离出来。所以我们的@Listener 注解看起来会有点不同:
@KafkaListener(topics = ["\${kafka-topic-1}"], containerFactory = "ContainerFactory1", groupId = "\${kafka.group-id}")
  • 我们不再让 Spring-Boot 为我们配置 Group-Id,所以我们现在需要在 Listener 中指定它。这意味着您的 application.properties 文件中不再定义 Spring.Kafka.Consumer 属性,我们需要以编程方式执行此操作。我们现在需要手动配置一些其他的东西,比如在启动时自动配置主题,如果你需要这个功能,你需要手动设置一个KafkaAdmin bean

结论

还有更多方法可以实现这一点,我知道其他人也提出了很好的解决方案,有时这完全取决于您的应用程序需要什么。这只是我发现成功的解决方案中的 2 个,方法 1 易于理解、实施和测试,无需过多涉及 Spring-Boot 和 Spring-Kafka 的深度!如果这是您需要的功能,这些方法将与超过 2 个代理一起使用。


推荐阅读