首页 > 解决方案 > Scastie 将编译器错误渲染为“值 countByValue 不是 org.apache.spark.sql.Dataset [String] 的成员”

问题描述

嗨,我正在尝试使用 scastie 程序查找收视率直方图……这是实现

scastie 中的 sbet 设置

        scalacOptions ++= Seq(
          "-deprecation",
          "-encoding", "UTF-8",
          "-feature",
          "-unchecked"
        )

            libraryDependencies ++= Seq(
              "org.apache.spark" %% "spark-core" % "2.4.3",
              "org.apache.spark" %% "spark-sql" % "2.4.3"
            )

scastie 中的实际代码

                    import org.apache.spark.sql.SparkSession
                    import org.apache.spark._
                    import org.apache.spark.SparkContext._
                    import org.apache.spark.sql.SparkSession
                    import org.apache.log4j._


                        object TestApp extends App {
                      lazy implicit val spark = 
                      SparkSession.builder().master("local").appName("spark_test").getOrCreate()
                      
                      import spark.implicits._ // Required to call the .toDF function later
                      
                      val html = scala.io.Source.fromURL("http://files.grouplens.org/datasets/movielens/ml- 
     
                      100k/u.data").mkString // Get all rows as one string
                      val seqOfRecords = html.split("\n") // Split based on the newline characters
                                     .filter(_ != "") // Filter out any empty lines
                                     .toSeq // Convert to Seq so we can convert to DF later
                                     .map(row => row.split("\t")) 
                                     .map { case Array(f1,f2,f3,f4) => (f1,f2,f3,f4) } 
                      
                      val df = seqOfRecords.toDF("col1", "col2", "col3", "col4") 
                      
                      val ratings = df.map(x => x.toString().split("\t")(2))
                      
                      

                    // Count up how many times each value (rating) occurs
                    val results = ratings.countByValue()

                    // Sort the resulting map of (rating, count) tuples
                    val sortedResults = results.toSeq.sortBy(_._1)

                    // Print each result on its own line.
                    sortedResults.foreach(println)

                      spark.close() 
                    }

进入 scastie 时出错

值 countByValue 不是 org.apache.spark.sql.Dataset[String] 的成员

有人可以帮助调试吗在此处输入图像描述

============================================ 修改后的代码在 Scastie 中给出了不同的错误现在

                    java.lang.ExceptionInInitializerError
                        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
                        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
                        at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:136)
                        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
                        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
                        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
                        at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
                        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
                        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
                        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
                        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
                        at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
                        at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
                        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
                        at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
                        at org.apache.spark.sql.Dataset.show(Dataset.scala:745)
                        at org.apache.spark.sql.Dataset.show(Dataset.scala:704)
                        at org.apache.spark.sql.Dataset.show(Dataset.scala:713)
                        at TestApp$.delayedEndpoint$TestApp$1(main.scala:22)
                        at TestApp$delayedInit$body.apply(main.scala:4)
                        at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
                        at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
                        at scala.App$$anonfun$main$1.apply(App.scala:76)
                        at scala.App$$anonfun$main$1.apply(App.scala:76)
                        at scala.collection.immutable.List.foreach(List.scala:392)
                        at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
                        at scala.App$class.main(App.scala:76)
                        at TestApp$.main(main.scala:4)
                        at TestApp.main(main.scala)
                        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                        at java.lang.reflect.Method.invoke(Method.java:498)
                        at sbt.Run.invokeMain(Run.scala:115)
                        at sbt.Run.execute$1(Run.scala:79)
                        at sbt.Run.$anonfun$runWithLoader$4(Run.scala:92)
                        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
                        at sbt.util.InterfaceUtil$$anon$1.get(InterfaceUtil.scala:10)
                        at sbt.TrapExit$App.run(TrapExit.scala:257)
                        at java.lang.Thread.run(Thread.java:748)
                    Caused by: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.8
                        at com.fasterxml.jackson.module.scala.JacksonModule$class.setupModule(JacksonModule.scala:64)
                        at com.fasterxml.jackson.module.scala.DefaultScalaModule.setupModule(DefaultScalaModule.scala:19)
                        at com.fasterxml.jackson.databind.ObjectMapper.registerModule(ObjectMapper.java:751)
                        at org.apache.spark.rdd.RDDOperationScope$.<init>(RDDOperationScope.scala:82)
                        at org.apache.spark.rdd.RDDOperationScope$.<clinit>(RDDOperationScope.scala)
                        ... 40 more

这是 scastie 中的更新代码

                import org.apache.spark.sql.SparkSession
                import org.apache.spark.sql.functions.col

                object TestApp extends App {
                  lazy implicit val spark = SparkSession.builder().master("local").appName("spark_test").getOrCreate()
                  
                  import spark.implicits._ // Required to call the .toDF function later
                  
                  val html = scala.io.Source.fromURL("http://files.grouplens.org/datasets/movielens/ml-100k/u.data").mkString // Get all rows as one string
                  val seqOfRecords = html.split("\n") // Split based on the newline characters
                                 .filter(_ != "") // Filter out any empty lines
                                 .toSeq // Convert to Seq so we can convert to DF later
                                 .map(row => row.split("\t")) // Split each line on tab character to make an Array of 4 String each
                                 .map { case Array(f1,f2,f3,f4) => (f1,f2,f3,f4) } // Convert that Array[String] into Array[(String, String, String, String)] 
                  
                  val df = seqOfRecords.toDF("col1", "col2", "col3", "col4") // Give whatever column names you want
                  
                  df.select("col3").groupBy("col3").count.sort(col("count").desc).show()

                  spark.close() // don't forget to close(), otherwise scastie won't let you create another session so soon.
                }

标签: scalaapache-spark

解决方案


当您到达ratings变量时,您正在使用称为数据集的 Spark 结构。您可以在此处查看描述它可以做什么和不能做什么的文档。它没有调用方法countByValue,这就是您收到所看到错误的原因。

在你到达这条线之前,你所拥有的一切都是有意义的:

val ratings = df.map(x => x.toString().split("\t")(2))

这当前会产生错误。

如果你回到df变量,你有一个看起来像这样的表:

+----+----+----+---------+
|col1|col2|col3|     col4|
+----+----+----+---------+
| 196| 242|   3|881250949|
| 186| 302|   3|891717742|
|  22| 377|   1|878887116|
| 244|  51|   2|880606923|
| 166| 346|   1|886397596|
+----+----+----+---------+
                  

您可以运行该命令df.show()来查看数据集中的样本。从那里我认为你想要一个看起来有点像groupBy. 看看这方面的一些例子,看看下一步该去哪里。


推荐阅读