首页 > 解决方案 > 独立消费者抛出 InvalidGroupIdException

问题描述

这是一个两部分的问题。

一、代码为:

import java.time.Duration
import java.util
import java.util.Properties

import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer

import scala.collection.JavaConverters._
import io.StdIn._

object StandaloneConsumer {
  private final var topics = ""
  private final val BOOTSTRAPSERVERS = "hostname:9092"

  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAPSERVERS)
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: java.lang.Boolean)
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])

    val consumer = new KafkaConsumer[String, String](props)
    topics = readLine("Enter the topic name: ")

    try {
      val partitionInfos = consumer.partitionsFor(topics)
      val topicPartitions = new util.ArrayList[TopicPartition]
      if(partitionInfos != null) {
        for (partitionInfo <- partitionInfos.asScala) {
          topicPartitions.add(new TopicPartition(partitionInfo.topic, partitionInfo.partition))
        }
        consumer.assign(topicPartitions)
        while (true) {
          val records = consumer.poll(Duration.ofMillis(500))
          for(record <- records.asScala) {
            println(s"key = ${record.key}, value = ${record.value()}")
          }
          consumer.commitAsync()
        }
      }
    } catch {
      case e:Exception => e.printStackTrace()
    } finally {
      consumer.commitSync()
      consumer.close()
    }
  }
}

当我运行它时,它会正确使用所有数据,但最后会给出:

org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
Exception in thread "main" org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.

这个错误。为什么?

二、我不明白这部分代码。到底是怎么回事?:

val topicPartitions = new util.ArrayList[TopicPartition]
  if(partitionInfos != null) {
    for (partitionInfo <- partitionInfos.asScala) {
      topicPartitions.add(new TopicPartition(partitionInfo.topic, partitionInfo.partition))
    }

有人可以向我解释一下吗。谢谢你。

标签: scalaapache-kafkakafka-consumer-api

解决方案


在 Kafka 中,只有当您是消费者组的一部分时,提交才有意义(然后让 Kafka 集群协调员管理分区分配和偏移获取/提交操作)。关于您的代码,您似乎正在执行手动分区分配,但您仍在尝试提交:

          consumer.commitAsync()

关于消费者组的一些文档:

https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

对于你的问题二

此代码正在获取属于您的主题的不同分区,然后您将这些分区分配给您的使用者。

您应该阅读更多有关 Kafka 消费者组的信息,并确定是否要使用它,或者自己处理分区分配。我强烈建议使用它(出于可扩展性目的),除非您真的想跳过它。


推荐阅读