scala - 独立消费者抛出 InvalidGroupIdException
问题描述
这是一个两部分的问题。
一、代码为:
import java.time.Duration
import java.util
import java.util.Properties
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import scala.collection.JavaConverters._
import io.StdIn._
object StandaloneConsumer {
private final var topics = ""
private final val BOOTSTRAPSERVERS = "hostname:9092"
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAPSERVERS)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: java.lang.Boolean)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
val consumer = new KafkaConsumer[String, String](props)
topics = readLine("Enter the topic name: ")
try {
val partitionInfos = consumer.partitionsFor(topics)
val topicPartitions = new util.ArrayList[TopicPartition]
if(partitionInfos != null) {
for (partitionInfo <- partitionInfos.asScala) {
topicPartitions.add(new TopicPartition(partitionInfo.topic, partitionInfo.partition))
}
consumer.assign(topicPartitions)
while (true) {
val records = consumer.poll(Duration.ofMillis(500))
for(record <- records.asScala) {
println(s"key = ${record.key}, value = ${record.value()}")
}
consumer.commitAsync()
}
}
} catch {
case e:Exception => e.printStackTrace()
} finally {
consumer.commitSync()
consumer.close()
}
}
}
当我运行它时,它会正确使用所有数据,但最后会给出:
org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
Exception in thread "main" org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
这个错误。为什么?
二、我不明白这部分代码。到底是怎么回事?:
val topicPartitions = new util.ArrayList[TopicPartition]
if(partitionInfos != null) {
for (partitionInfo <- partitionInfos.asScala) {
topicPartitions.add(new TopicPartition(partitionInfo.topic, partitionInfo.partition))
}
有人可以向我解释一下吗。谢谢你。
解决方案
在 Kafka 中,只有当您是消费者组的一部分时,提交才有意义(然后让 Kafka 集群协调员管理分区分配和偏移获取/提交操作)。关于您的代码,您似乎正在执行手动分区分配,但您仍在尝试提交:
consumer.commitAsync()
关于消费者组的一些文档:
对于你的问题二
此代码正在获取属于您的主题的不同分区,然后您将这些分区分配给您的使用者。
您应该阅读更多有关 Kafka 消费者组的信息,并确定是否要使用它,或者自己处理分区分配。我强烈建议使用它(出于可扩展性目的),除非您真的想跳过它。
推荐阅读
- python - PythonWin 在不同版本中打开不同的脚本(Python 2 & Python 3)
- go - 评估字符串(字符串到 [] 字符串)
- ios - Swift - Firebase - 订单收集
- java - 反应原生 webview 与 swift/java webview
- python - Discord.py 在部署到 Heroku 时说“通过了不正确的令牌”
- javascript - 为什么我的嵌套网格视图只显示一瞬间?
- .net - PowerShell 解析 .NET 选项卡完成方法和属性
- python - 使用反斜杠解析非常大的 JSON 文件(JSON 编码)
- ruby-on-rails - 循环中有没有办法知道你是否在最后一次迭代中?
- c++ - 有人可以解释一下吗?这是关于数组和数组内部 if 语句的一些处理