apache-spark - 缺少必需的配置“partition.assignment.strategy”
问题描述
我正在与 Kafka 一起运行 Spark Structured Streaming。下面是 pom.xml
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<!-- Put the Scala version of the cluster -->
<scalaVersion>2.12.10</scalaVersion>
<sparkVersion>3.0.1</sparkVersion>
</properties>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${sparkVersion}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${sparkVersion}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>${sparkVersion}</version>
</dependency>
使用 shade 插件构建 fat jar。jar 使用命令在我的本地设置中按预期运行
spark-submit --master local[*] --class com.stream.Main --num-executors 3 --driver-memory 2g --executor-cores 2 --executor-memory 3g prism-event-synch-rta.jar
但是,当我尝试使用带有命令的 yarn 在 spark 集群中运行相同的 jar 时:
spark-submit --master yarn --deploy-mode cluster --class com.stream.Main --num-executors 4 --driver-memory 2g --executor-cores 1 --executor-memory 4g gs://jars/prism-event-synch-rta.jar
获取此异常:
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:355)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "partition.assignment.strategy" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)
我试过设置“partition.assignment.strategy”,然后它也不起作用。
编辑:也尝试使用包选项发送 kafka 客户端。结果是同样的例外。
spark-submit --packages org.apache.kafka:kafka-clients:2.1.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 --master yarn --deploy-mode cluster --class com.stream.Main --num-executors 4 --driver-memory 2g --executor-cores 1 --executor-memory 4g gs://jars/prism-event-synch-rta.jar
请帮忙。
解决方案
我怀疑问题出在 kafka 客户端 jar 上。我的第一个预感是,kafka 客户端 jar 必须在为在集群中运行而创建的 fat jar 中被覆盖。
为了使我的理论具体化,我在本地安装了相同的 spark 版本(3.0.1)并运行了这项工作。它航行得很漂亮。
我登录到 dataproc 3.0.1 集群并 sudo find /usr -name "kafka-clients*",瞧,我找到了旧版本的 jar。/usr/lib/hadoop/lib/kafka-clients-0.8.2.1.jar 我打开 spark.env 来验证,在运行时是否正在加载路径。毫不奇怪,它正在加载。
现在我们知道了根本原因。要找到解决方案,需要覆盖提供的 jar。做了一些研究,发现 maven shade 插件可以在打包时更改类的名称。
<relocations>
<relocation>
<pattern>org.apache.kafka</pattern>
<shadedPattern>shade.org.apache.kafka</shadedPattern>
</relocation>
</relocations>
推荐阅读
- keycloak - Keycloak 资源服务器授权流程
- python - 熊猫的 to_sql() 方法将主键列作为 NULL 发送,即使该列不存在于数据框中
- python - 当变量设置为 True 时,我无法让我的 Discord Bot 发送消息
- javascript - 在特定时间修改 cookie Node.js
- c# - 如何清理发布文件夹,以免 DLL 不断被复制到我的网站?
- c++ - 在 C++ 中删除 [] char* 后程序卡住
- python - 通过 Webhook 发送上传的图像
- r - jags 贝叶斯线性回归,当先验相互依赖时,我如何设置先验?
- powershell - Powershell Export-CSV 但更改标题名称
- scheme - 功能直到->转换(球拍)