首页 > 解决方案 > spark rdd级别中具有groupby的条件运算符 - scala

问题描述

我正在使用Spark 1.60Scala 2.10.5

我有一个这样的数据框,

+------------------+
|id | needed       | 
+------------------+
|1  | 2            |                                                                                                                                    
|1  | 0            |
|1  | 3            |
|2  | 0            |
|2  | 0            |
|3  | 1            |
|3  | 2            |                                                                                                    
+------------------+

从这里df我创造了一个rdd这样的,

 val  dfRDD = df.rdd

从我的rdd,我想分组id和计数的needed> 0

((1, 2), (2,0), (3,2))

所以,我试过这样,

val groupedDF = dfRDD.map(x =>(x(0), x(1) > 0)).count.redueByKey(_+_)

在这种情况下,我收到一个错误:

错误:值 > 不是任何成员

我需要那个rdd水平。获得我想要的输出的任何帮助都会很棒。

标签: scalaapache-sparkrdd

解决方案


问题是,在你map调用Row的apply 方法,正如你在它的scaladoc中看到的那样,该方法返回Any - 正如你所看到的错误和从scaladoc中没有这样的方法Any <

getAs[T]您可以使用该方法修复它。

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

val spark =
  SparkSession
    .builder
    .master("local[*]")
    .getOrCreate()
import spark.implicits._

val df =
  List(
    (1, 2),
    (1, 0),
    (1, 3),
    (2, 0),
    (2, 0),
    (3, 1),
    (3, 2)
  ).toDF("id", "needed")

val rdd: RDD[(Int, Int)] = df.rdd.map(row => (row.getAs[Int](fieldName = "id"), row.getAs[Int](fieldName = "needed")))

从那里你可以继续聚合,你的逻辑有一些错误。
首先,您不需要count通话。
其次,如果您需要计算"needed"大于 1 的次数,则无法计算_ + _,因为这是所需值的总和。

val grouped: RDD[(Int, Int)] = rdd.reduceByKey { (acc, v) => if (v > 0) acc + 1 else acc }

val result: Array[(Int, Int)] = grouped.collect()
// Array((1,3), (2,0), (3,2))

PS:你应该告诉你的教授升级到 Spark 2 和 Scala 2.11 ;)

编辑

在上面的示例中使用案例类。

final case class Data(id: Int, needed: Int)
val rdd: RDD[Data] = df.as[Data].rdd
val grouped: RDD[(Int, Int)] = rdd.map(d => d.id -> d.needed).reduceByKey { (acc, v) => if (v > 0) acc + 1 else acc }  
val result: Array[(Int, Int)] = grouped.collect()
// Array((1,3), (2,0), (3,2))

推荐阅读