scala - Spark streaming 2.4.0 获取 org.apache.spark.sql.AnalysisException:找不到数据源:kafka
问题描述
尝试从 Kafka 读取数据时出现以下错误。我正在使用 docker-compose 来运行 kafka 和 spark。
Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".
这是我的阅读代码:
object Livedata extends App with LazyLogging {
logger.info("starting livedata...")
val spark = SparkSession.builder().appName("livedata").master("local[*]").getOrCreate()
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "topic")
.option("startingOffsets", "latest")
.load()
df.printSchema()
val hadoopConfig = spark.sparkContext.hadoopConfiguration
hadoopConfig.set("fs.hdfs.impl", classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
hadoopConfig.set("fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName)
}
在阅读了几个答案后,我添加了 sbt build 的所有包
这是 build.sbt 文件:
lazy val root = (project in file(".")).
settings(
inThisBuild(List(
organization := "com.live.data",
version := "0.1.0",
scalaVersion := "2.12.2",
assemblyJarName in assembly := "livedata.jar"
)),
name := "livedata",
libraryDependencies ++= List(
"org.scalatest" %% "scalatest" % "3.0.5",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
"org.apache.spark" %% "spark-sql" % "2.4.0",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0" % "provided",
"org.apache.kafka" % "kafka-clients" % "2.5.0",
"org.apache.kafka" % "kafka-streams" % "2.5.0",
"org.apache.kafka" %% "kafka-streams-scala" % "2.5.0"
)
)
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs@_*) => MergeStrategy.discard
case x => MergeStrategy.first
}
不知道这里的主要问题是什么。
更新:
最后我从这里得到了解决方案连接火花结构化流+卡夫卡时出错
主要问题是得到这个 org.apache.spark.sql.AnalysisException: Failed to find data source: kafka exception because spark-sql-kafka library is not available in classpath & 它无法找到 org.apache.spark.sql.sources .DataSourceRegister 在 META-INF/services 文件夹中。
以下代码块需要在 build.sbt 中添加。这将在最终的 jar 中包含 org.apache.spark.sql.sources.DataSourceRegister 文件。
// META-INF discarding
assemblyMergeStrategy in assembly := {
case PathList("META-INF","services",xs @ _*) => MergeStrategy.filterDistinctLines
case PathList("META-INF",xs @ _*) => MergeStrategy.discard
case "application.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
}```
解决方案
spark-sql-kafka-0-10 没有提供,所以去掉那部分依赖。(虽然提供了 spark-sql,所以你可以将它添加到那个)
您也不应该拉 Kafka Streams(因为 Spark 不使用它),并且 kafka-clients 由 sql-kafka 传递,所以也不需要
推荐阅读
- nativescript-vue - 网格布局中的非活动按钮 - Nativescript-vue
- python - Pandas - zfill 混合列中的仅数值
- jpa - JPA 合并覆盖延迟加载的属性
- php - 关闭弹出窗口后重定向到新页面
- ms-access - 如何在 Access 上删除多个选定的记录?
- mysql - 如何从 Laravel 创建这种类型的 JSON 数据?
- spring - @Transactional(rollbackFor = Exception.class) 和 @Transactional(propagation=Propagation.REQUIRED) 有什么区别
- javascript - 事件未使用函数绑定定义
- css - Internet Explorer 中 bootstrap 4 的 img-fluid 问题
- html - Z-Index 与 Slim Select 的可折叠 Div 中的溢出