apache-spark - Spark流和kafka缺少所需的配置“partition.assignment.strategy”,没有默认值
问题描述
我正在尝试使用 yarn 与 Kafka 一起运行 spark 流应用程序。我收到以下堆栈跟踪错误-
原因:org.apache.kafka.common.config.ConfigException:缺少没有默认值的必需配置“partition.assignment.strategy”。在 org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124) 在 org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:48) 在 org.apache.kafka.clients。 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:380) 上的 consumer.ConsumerConfig.(ConsumerConfig.java:194) org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:363) ) 在 org.apache.spark.streaming.kafka010.CachedKafkaConsumer.(CachedKafkaConsumer.scala:45) 在 org.apache.spark.streaming 的 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:350)。 kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:194) 在 org.
这是我如何使用火花流创建 KafkaStream 的代码片段-
val ssc = new StreamingContext(sc, Seconds(60))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "*boorstrap_url:port*",
"security.protocol" -> "SASL_PLAINTEXT",
"sasl.kerberos.service.name" -> "kafka",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "annotation-test",
//Tried commenting and uncommenting this property
//"partition.assignment.strategy"->"org.apache.kafka.clients.consumer.RangeAssignor",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean))
val topics = Array("*topic-name*")
val kafkaStream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
val valueKafka = kafkaStream.map(record => record.value())
我浏览了以下帖子-
据此,我已将我的 fat jar 中的 kafka util jar 从默认从 spark-stream-kafka-jar 打包的 0.10.1.0 更新为0.10.2.0版本作为瞬态依赖项。当我通过将master设置为本地来在单个节点上运行它时,我的工作也可以正常工作。我正在运行 spark 2.3.1 版本。
解决方案
添加kafka-clients-*.jar
到您的 spark jar 文件夹中。kafka-clients-*.jar
在kafka-*/lib
目录中。
推荐阅读
- java - 我正在尝试在 java 中构建多项选择测试,由于某种原因,我的代码仅适用于两个问题,而不再适用
- r - 使用 Neo4j 图形数据构建应用程序时出现 R Shiny 反应性环境错误
- rest-assured - Unirest 在 API 测试中优于 RestAssured 的优缺点是什么?
- php - laravel 刀片组件路径语法是什么意思?
- google-chrome - iOS Chrome issue
- php - 我们如何使用 ajax 将带有正斜杠“/”的数字传递给 codeigniter 控制器
- php - PHP代码说输入是空的,但它不是?
- r - R geom_hist limits 删除限制范围内的数据
- android - Cordova webview 在后台运行时在 Android 8.0 上丢失了 wifi 连接
- python - 选择语句仅返回结果中的第一行