apache-spark - Spark Kafka 数据消费包
问题描述
我尝试使用文档中提到的以下代码来使用我的 kafka 主题:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092,") \
.option("subscribe", "first_topic") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
我得到了错误:
AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".
所以我尝试了:
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 ...
安装 kafka 包及其依赖项。但我收到此错误:
21/06/21 13:45:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" org.apache.spark.SparkException: Failed to get main class in JAR with error 'File file:/home/soheil/spark-3.1.2-bin-hadoop3.2/... does not exist'. Please specify one with --class.
at org.apache.spark.deploy.SparkSubmit.error(SparkSubmit.scala:968)
at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:486)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
我应该怎么做才能安装这个包?
解决方案
您在这里遇到的错误与 Kafka 无关
file:/home/soheil/spark-3.1.2-bin-hadoop3.2/... does not exist
这是在 Spark 所依赖的 PATH 上引用您的HADOOP_HOME
和/或变量。HADOOP_CONF_DIR
检查这些配置是否正确,并且您可以在运行自己的脚本之前运行使用 Kafka 的 Spark Structured Streaming WordCount 示例。
$ bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 \
structured_kafka_wordcount.py \
host1:port1,host2:port2 subscribe topic1,topic2
下一部分Please specify one with --class.
是说 CLI 解析器失败了;可能是因为您输入错误的spark-submit
选项或文件路径中有空格,例如
推荐阅读
- laravel - 如何在 mydomain.com/bookstack 上指向 bookstack (Laravel) 应用程序的公用文件夹
- html - 除非我重命名,否则特定图像不会出现在网站上?
- python - 有没有办法通过 python 套接字连接到 sqlite3 数据库?
- angular - 带有 RxJs 的 Angular 9 - 结合 2 个 observables 会产生数据,但异步管道总是看到空结果
- python - 如何将变量分配给 eval / exec 值?
- javascript - 如何制定功能解决方案?
- docker - 在 Python 中使用网站类型的 URL 连接到 redis-server
- reporting-services - SSRS KeepWithGroup 属性
- python - 即使系统关闭,如何在预定时间自动运行.exe文件?
- python - C Python api ~ 字符串分段错误