首页 > 解决方案 > 在 RDD 中添加列

问题描述

我正在尝试添加多个列(Int值)以根据全球销售额查找最高和最低销量的类型。

表格格式: Name , Platform , Year , Genre , Publisher , NA_Sales , EU_Sales , JP_Sales , Other_Sales

示例数据集:(公式)[Global Sales = NA_Sales + EU_Sales + JP_Sales]

示例输出:

最高销量类型:射击游戏全球销量(百万):27.57

最低销量类型:战略全球销量(百万):0.23

val vgdataLines = sc.textFile("hdfs:///user/ashhall1616/bdc_data/t1/vgsales-small.csv")
val vgdata = vgdataLines.map(_.split(";"))

val GlobalSales  = vgdata.map(r => r(3), r(5) + r(6) + r(7)). reduceByKey(_+_)

我在这里尝试使用的是通过键减少将总数减少NA_Sales + EU_Sales + JP_Sales到一个值,然后按流派减少。我创建GlobalSales了类型和总销售额。但是r(5) + r(6) + r(7)将值添加到字符串中。

Array[String] = Array(6.855.091.87, 9.034.280.13, 5.895.043.12, 9.673.730.11, 4.42.773.96, 0.180.140, 000.37, 0.20.070, 0.140.320.22, 0.140.110, 0.090.010.15
, 0.020.020.22, 0.140.110, 0.10.130, 0.140.110, 0.110.030, 0.130.020, 0.090.030, 0.060.040, 0.1200)

标签: scalaapache-sparkapache-spark-sqlrdd

解决方案


在此处使用来自此 stackoverflow的数据-(我相信这两个问题都使用相同的数据集)

使用 拆分数据后;,您会得到 ,Array[String]并且当您在创建时添加它时tuple,它将附加这些数字。您可以Double在创建元组时将这些字符串转换为。

代码

    val data =
      """Gran Turismo 3: A-Spec;PS2;2001;Racing;Sony Computer Entertainment;6.85;5.09;1.87;1.16
        |Call of Duty: Modern Warfare 3;X360;2011;Shooter;Activision;9.03;4.28;0.13;1.32
        |Pokemon Yellow: Special Pikachu Edition;GB;1998;Role-Playing;Nintendo;5.89;5.04;3.12;0.59
        |Call of Duty: Black Ops;X360;2010;Shooter;Activision;9.67;3.73;0.11;1.13
        |Pokemon HeartGold/Pokemon SoulSilver;DS;2009;Action;Nintendo;4.4;2.77;3.96;0.77
        |High Heat Major League Baseball 2003;PS2;2002;Sports;3DO;0.18;0.14;0;0.05
        |Panzer Dragoon;SAT;1995;Shooter;Sega;0;0;0.37;0
        |Corvette;GBA;2003;Racing;TDK Mediactive;0.2;0.07;0;0.01""".stripMargin

    val vgdataLines = spark.sparkContext.makeRDD(data.split("\n").toSeq)
    val vgdata = vgdataLines.map(_.split(";"))

    val GlobalSales  = vgdata.map(r => (r(3), r(5).toDouble + r(6).toDouble + r(7).toDouble)). reduceByKey(_+_)

    GlobalSales.foreach(println)

输出-

(Shooter,27.32)
(Role-Playing,14.05)
(Sports,0.32)
(Action,11.129999999999999)
(Racing,14.079999999999998)

根据评论中的要求更新1

 println("### min-max ###")
    val minSale = GlobalSales.min()(Ordering.by(_._2))
    val maxSale = GlobalSales.max()(Ordering.by(_._2))
    println(s"Highest selling Genre: '${maxSale._1}' Global Sale (in millions): '${maxSale._2}'.")
    println(s"Lowest selling Genre: '${minSale._1}' Global Sale (in millions): '${minSale._2}'.")

输出-

### min-max ###
Highest selling Genre: 'Shooter' Global Sale (in millions): '27.32'.
Lowest selling Genre: 'Sports' Global Sale (in millions): '0.32'.

一些解释-

  • GlobalSales是一个RDD[Tuple2[String, Double]。在执行maxmin处理元组时,它通常按顺序排序,即比较第一个值,然后比较第二个。在您的用例中,您直接希望在元组的第二个元素上收集最大值(以吨为单位的全局销售),因此要覆盖元组排序的默认行为,我们正在使用这个Ordering.by(_._2)

推荐阅读