scala - spark rdd级别中具有groupby的条件运算符 - scala
问题描述
我正在使用Spark 1.60
和Scala 2.10.5
我有一个这样的数据框,
+------------------+
|id | needed |
+------------------+
|1 | 2 |
|1 | 0 |
|1 | 3 |
|2 | 0 |
|2 | 0 |
|3 | 1 |
|3 | 2 |
+------------------+
从这里df
我创造了一个rdd
这样的,
val dfRDD = df.rdd
从我的rdd
,我想分组id
和计数的needed
是> 0
。
((1, 2), (2,0), (3,2))
所以,我试过这样,
val groupedDF = dfRDD.map(x =>(x(0), x(1) > 0)).count.redueByKey(_+_)
在这种情况下,我收到一个错误:
错误:值 > 不是任何成员
我需要那个rdd
水平。获得我想要的输出的任何帮助都会很棒。
解决方案
问题是,在你map
调用Row的apply
方法,正如你在它的scaladoc中看到的那样,该方法返回Any - 正如你所看到的错误和从scaladoc中没有这样的方法Any <
getAs[T]
您可以使用该方法修复它。
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
val spark =
SparkSession
.builder
.master("local[*]")
.getOrCreate()
import spark.implicits._
val df =
List(
(1, 2),
(1, 0),
(1, 3),
(2, 0),
(2, 0),
(3, 1),
(3, 2)
).toDF("id", "needed")
val rdd: RDD[(Int, Int)] = df.rdd.map(row => (row.getAs[Int](fieldName = "id"), row.getAs[Int](fieldName = "needed")))
从那里你可以继续聚合,你的逻辑有一些错误。
首先,您不需要count
通话。
其次,如果您需要计算"needed"
大于 1 的次数,则无法计算_ + _
,因为这是所需值的总和。
val grouped: RDD[(Int, Int)] = rdd.reduceByKey { (acc, v) => if (v > 0) acc + 1 else acc }
val result: Array[(Int, Int)] = grouped.collect()
// Array((1,3), (2,0), (3,2))
PS:你应该告诉你的教授升级到 Spark 2 和 Scala 2.11 ;)
编辑
在上面的示例中使用案例类。
final case class Data(id: Int, needed: Int)
val rdd: RDD[Data] = df.as[Data].rdd
val grouped: RDD[(Int, Int)] = rdd.map(d => d.id -> d.needed).reduceByKey { (acc, v) => if (v > 0) acc + 1 else acc }
val result: Array[(Int, Int)] = grouped.collect()
// Array((1,3), (2,0), (3,2))
推荐阅读
- php - 仅在 Woocommerce 结帐中未隐藏时才需要制作订单备注字段
- php - 如何将 reactJS 前端与 PHP 连接起来?
- python-2.7 - 我正在尝试使用烧瓶在 python 中使用 twitter 进行 Oauth 并且获取请求页面无效
- postgresql - 错误:SELECT 中类型字符变化(255)的值太长
- apache-camel - 如何防止来自 Camel“smtp”组件的 java 邮件“找不到预期的资源”警告?
- javascript - OpenLayers 移动和重新缩放图像 ol.interaction.Draw
- java - gradle 刷新后 jcenter 502 网关错误
- c# - 使用 csProj 发布到 NuGet:如何添加元数据?
- python - 如何使用 ElementTree 解析具有多个相同字段和嵌套数据的 XML 字符串?
- c# - 从 TextLine 获取评论