scala - Apache spark内部连接2个数据框得到TreeNodeException
问题描述
我对 Apache spark 很陌生,我正在尝试将 Book 表与book_id
带有 read_count 的表进行内部连接。目标是生成一个书名表及其相应的阅读计数。
首先,我有一个表booksRead
,其中包含用户阅读书籍的记录,按book_id
as 分组,按他们的阅读频率排序。
booksReadDF.groupBy($"book_id").agg(count("book_id").as("read_count"))
.orderBy($"read_count".desc)
.show()
+-------+----------+
|book_id|read_count|
+-------+----------+
| 8611| 565|
| 14| 436|
| 11850| 394|
| 15| 357|
| 11803| 324|
+-------+----------+
only showing top 5 rows
我正在尝试内部加入如下所示的表books
:
+------+--------------------+--------------------+--------------------+-------------+----------+--------------------+--------------------+--------------------+---------+----+--------------+------------+--------------------+--------------------+--------+--------+-----------------+---------------+----------------+--------+----------+
| id| created_at| updated_at| title| isbn_13| isbn_10| image_url| description| publisher|author_id|year|overall_rating|audible_link| google_url| query_title|category|language|number_of_reviews|waterstone_link|amazon_available|is_ebook|page_count|
+------+--------------------+--------------------+--------------------+-------------+----------+--------------------+--------------------+--------------------+---------+----+--------------+------------+--------------------+--------------------+--------+--------+-----------------+---------------+----------------+--------+----------+
|115442|2018-07-25 00:59:...|2018-07-25 00:59:...|Representation of...|9781361479278|1361479272|http://books.goog...|This dissertation...|Open Dissertation...| 62130|2017| null| null|http://books.goog...|representation of...| | en| 0| null| true| false| null|
|115450|2018-07-25 00:59:...|2018-07-25 00:59:...|Imag(in)ing the W...|9789004182981|9004182985|http://books.goog...|This study examin...| BRILL| 73131|2010| null| null|http://books.goog...|imagining the war...| | en| 0| null| true| false| null|
|218332|2018-08-19 14:48:...|2018-08-19 14:48:...|My Life With Tibe...|9781462802357|1462802354|http://books.goog...|Your child is a m...| Xlibris Corporation| 118091|2008| null| null|https://play.goog...|my life with tibe...| | en| 0| null| true| false| null|
|186991|2018-08-11 11:08:...|2018-08-11 11:08:...| NOT "Just Friends"|9781416586401|1416586407|http://books.goog...|One of the world’...| Simon and Schuster| 7687|2007| null| null|https://play.goog...| not just friends| | en| 0| null| true| false| null|
|247317|2018-09-06 08:23:...|2018-09-06 08:23:...|OCR AS and A Leve...|9781910523056|1910523054|https://images-eu...|A complete course...| PG Online Limited| 128220|2016| null| null| null|ocr as and a leve...| null| English| null| null| true| false| null|
+------+--------------------+--------------------+--------------------+-------------+----------+--------------------+--------------------+--------------------+---------+----+--------------+------------+--------------------+--------------------+--------+--------+-----------------+---------------+----------------+--------+----------+
only showing top 5 rows
where使用以下命令在 book 表上book_id
连接:id
booksReadDF.groupBy($"book_id").agg(count("book_id").as("read_count"))
.orderBy($"read_count".desc)
.join(booksDF, booksReadDF.col("book_id") === booksDF.col("id"), "inner")
.show()
但我收到此错误:
Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(book_id#4, 200)
+- *(3) Sort [read_count#67L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(read_count#67L DESC NULLS LAST, 200)
+- *(2) HashAggregate(keys=[book_id#4], functions=[count(book_id#4)], output=[book_id#4, read_count#67L])
+- Exchange hashpartitioning(book_id#4, 200)
+- *(1) HashAggregate(keys=[book_id#4], functions=[partial_count(book_id#4)], output=[book_id#4, count#215L])
+- *(1) Scan JDBCRelation(books_readbook) [numPartitions=1] [book_id#4] PushedFilters: [*IsNotNull(book_id)], ReadSchema: struct<book_id:int>
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.InputAdapter.doExecute(WholeStageCodegenExec.scala:383)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.joins.SortMergeJoinExec.inputRDDs(SortMergeJoinExec.scala:386)
at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
at org.apache.spark.sql.Dataset.show(Dataset.scala:745)
at org.apache.spark.sql.Dataset.show(Dataset.scala:704)
at org.apache.spark.sql.Dataset.show(Dataset.scala:713)
at DBConn$.main(DBConn.scala:36)
at DBConn.main(DBConn.scala)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange rangepartitioning(read_count#67L DESC NULLS LAST, 200)
+- *(2) HashAggregate(keys=[book_id#4], functions=[count(book_id#4)], output=[book_id#4, read_count#67L])
+- Exchange hashpartitioning(book_id#4, 200)
+- *(1) HashAggregate(keys=[book_id#4], functions=[partial_count(book_id#4)], output=[book_id#4, count#215L])
+- *(1) Scan JDBCRelation(books_readbook) [numPartitions=1] [book_id#4] PushedFilters: [*IsNotNull(book_id)], ReadSchema: struct<book_id:int>
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 52 more
Caused by: java.lang.IllegalArgumentException: Unsupported class file major version 56
at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:166)
at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:148)
at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:136)
at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:237)
at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:49)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:517)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:500)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:500)
at org.apache.xbean.asm6.ClassReader.readCode(ClassReader.java:2175)
at org.apache.xbean.asm6.ClassReader.readMethod(ClassReader.java:1238)
at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:631)
at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:355)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:307)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:306)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:306)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2100)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:309)
at org.apache.spark.RangePartitioner.<init>(Partitioner.scala:171)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:224)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 72 more
19/06/17 16:21:47 INFO SparkContext: Invoking stop() from shutdown hook
19/06/17 16:21:47 INFO SparkUI: Stopped Spark web UI at http://10.245.65.12:4040
19/06/17 16:21:47 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/06/17 16:21:47 INFO MemoryStore: MemoryStore cleared
19/06/17 16:21:47 INFO BlockManager: BlockManager stopped
19/06/17 16:21:48 INFO BlockManagerMaster: BlockManagerMaster stopped
19/06/17 16:21:48 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/06/17 16:21:48 INFO SparkContext: Successfully stopped SparkContext
19/06/17 16:21:48 INFO ShutdownHookManager: Shutdown hook called
19/06/17 16:21:48 INFO ShutdownHookManager: Deleting directory /private/var/folders/ql/dpk0v2gs15z83pvwt_g3n7lh0000gn/T/spark-9368b8cb-0cf6-45a5-9548-a9c1975dab46
解决方案
你的核心例外是
java.lang.IllegalArgumentException: Unsupported class file major version 56
这表明在某些时候您正在尝试运行为与运行时不同的 Java 版本编译的字节码。确保您在 Java 8 JRE 上运行 Spark,并确保所有依赖项(例如您的 Postgres JDBC 驱动程序)也是为 Java 8 构建的。
推荐阅读
- javascript - 如何获取输入值,以货币样式(1,000,000)显示,但将其转换回整数进行计算?
- sql-server - 如何使用扩展字符搜索类似语句
- python - Python 中来自 AWS S3 的 gzip 文件的内容仅返回空字节
- python - pip install via requirements.txt 指定直接 GitHub 私有仓库 + 分支名称错误,退出状态为 128
- vba - 在访问表单中使用查询结果的问题
- c# - 为什么 Graphql Result 提供的 json 详细信息比请求的查询多
- ffmpeg - 如何使用 FFmpeg 将视频升级到固定分辨率?
- docker-compose - Docker Compose 版本错误 - 不工作/开始随机工作
- azure-log-analytics - 如何在 Log Analytics 中同时在 y 轴上显示 val 和 threshold?
- reactjs - 如何在TouchableOpacity标签中嵌入的按钮标签中触发onPress而不触发TouchableOpacity中的onPress