scala - Scastie 将编译器错误渲染为“值 countByValue 不是 org.apache.spark.sql.Dataset [String] 的成员”
问题描述
嗨,我正在尝试使用 scastie 程序查找收视率直方图……这是实现
scastie 中的 sbet 设置
scalacOptions ++= Seq(
"-deprecation",
"-encoding", "UTF-8",
"-feature",
"-unchecked"
)
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.4.3",
"org.apache.spark" %% "spark-sql" % "2.4.3"
)
scastie 中的实际代码
import org.apache.spark.sql.SparkSession
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SparkSession
import org.apache.log4j._
object TestApp extends App {
lazy implicit val spark =
SparkSession.builder().master("local").appName("spark_test").getOrCreate()
import spark.implicits._ // Required to call the .toDF function later
val html = scala.io.Source.fromURL("http://files.grouplens.org/datasets/movielens/ml-
100k/u.data").mkString // Get all rows as one string
val seqOfRecords = html.split("\n") // Split based on the newline characters
.filter(_ != "") // Filter out any empty lines
.toSeq // Convert to Seq so we can convert to DF later
.map(row => row.split("\t"))
.map { case Array(f1,f2,f3,f4) => (f1,f2,f3,f4) }
val df = seqOfRecords.toDF("col1", "col2", "col3", "col4")
val ratings = df.map(x => x.toString().split("\t")(2))
// Count up how many times each value (rating) occurs
val results = ratings.countByValue()
// Sort the resulting map of (rating, count) tuples
val sortedResults = results.toSeq.sortBy(_._1)
// Print each result on its own line.
sortedResults.foreach(println)
spark.close()
}
进入 scastie 时出错
值 countByValue 不是 org.apache.spark.sql.Dataset[String] 的成员
============================================ 修改后的代码在 Scastie 中给出了不同的错误现在
java.lang.ExceptionInInitializerError
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:136)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
at org.apache.spark.sql.Dataset.show(Dataset.scala:745)
at org.apache.spark.sql.Dataset.show(Dataset.scala:704)
at org.apache.spark.sql.Dataset.show(Dataset.scala:713)
at TestApp$.delayedEndpoint$TestApp$1(main.scala:22)
at TestApp$delayedInit$body.apply(main.scala:4)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at TestApp$.main(main.scala:4)
at TestApp.main(main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at sbt.Run.invokeMain(Run.scala:115)
at sbt.Run.execute$1(Run.scala:79)
at sbt.Run.$anonfun$runWithLoader$4(Run.scala:92)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at sbt.util.InterfaceUtil$$anon$1.get(InterfaceUtil.scala:10)
at sbt.TrapExit$App.run(TrapExit.scala:257)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.8
at com.fasterxml.jackson.module.scala.JacksonModule$class.setupModule(JacksonModule.scala:64)
at com.fasterxml.jackson.module.scala.DefaultScalaModule.setupModule(DefaultScalaModule.scala:19)
at com.fasterxml.jackson.databind.ObjectMapper.registerModule(ObjectMapper.java:751)
at org.apache.spark.rdd.RDDOperationScope$.<init>(RDDOperationScope.scala:82)
at org.apache.spark.rdd.RDDOperationScope$.<clinit>(RDDOperationScope.scala)
... 40 more
这是 scastie 中的更新代码
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
object TestApp extends App {
lazy implicit val spark = SparkSession.builder().master("local").appName("spark_test").getOrCreate()
import spark.implicits._ // Required to call the .toDF function later
val html = scala.io.Source.fromURL("http://files.grouplens.org/datasets/movielens/ml-100k/u.data").mkString // Get all rows as one string
val seqOfRecords = html.split("\n") // Split based on the newline characters
.filter(_ != "") // Filter out any empty lines
.toSeq // Convert to Seq so we can convert to DF later
.map(row => row.split("\t")) // Split each line on tab character to make an Array of 4 String each
.map { case Array(f1,f2,f3,f4) => (f1,f2,f3,f4) } // Convert that Array[String] into Array[(String, String, String, String)]
val df = seqOfRecords.toDF("col1", "col2", "col3", "col4") // Give whatever column names you want
df.select("col3").groupBy("col3").count.sort(col("count").desc).show()
spark.close() // don't forget to close(), otherwise scastie won't let you create another session so soon.
}
解决方案
当您到达ratings
变量时,您正在使用称为数据集的 Spark 结构。您可以在此处查看描述它可以做什么和不能做什么的文档。它没有调用方法countByValue
,这就是您收到所看到错误的原因。
在你到达这条线之前,你所拥有的一切都是有意义的:
val ratings = df.map(x => x.toString().split("\t")(2))
这当前会产生错误。
如果你回到df
变量,你有一个看起来像这样的表:
+----+----+----+---------+
|col1|col2|col3| col4|
+----+----+----+---------+
| 196| 242| 3|881250949|
| 186| 302| 3|891717742|
| 22| 377| 1|878887116|
| 244| 51| 2|880606923|
| 166| 346| 1|886397596|
+----+----+----+---------+
您可以运行该命令df.show()
来查看数据集中的样本。从那里我认为你想要一个看起来有点像groupBy
. 看看这方面的一些例子,看看下一步该去哪里。
推荐阅读
- python - 错误:由于 EnvironmentError 无法安装软件包:[Errno 2] 没有这样的文件或目录:
- pytorch - Pytorch:根据索引张量从 3d 张量中选择列
- r - 如何从 R 中的 .rtf 中提取日期
- spring - 我们能否在 AEM 6.x 中使用或不使用 Spring 来实现 AOP?
- elixir - 如何通过命令行参数来混合运行 --no-halt
- python - 如何修复“不支持带有编码声明的 Unicode 字符串”。
- elasticsearch - 弹性搜索建议器仅对单个单词而不是多个单词进行拼写检查
- r - 从复杂的div中提取数据
- android - 如何在 Spinner Android 3.5 中添加项目?
- css - 设置 minHeight 时如何修复额外间距?