首页 > 解决方案 > 值 toDF 不是 org.apache.spark.rdd.RDD[(K, V)] 的成员

问题描述

注意:在以下问题的答案中给出的建议 无效 toDF is not a member of org.apache.spark.rdd.RDD value toDF is not a member of org.apache.spark.rdd.RDD[天气]

我正在尝试编写一个通用函数,该函数仅保留给定数据集中每个键的前 k 个值:

下面是代码:

def topKReduceByKey[K:ClassTag,V:Ordering](ds: Dataset[(K, V)], k: Int): Dataset[(K, V)] = {
    import sqlContext.implicits._
    ds
      .rdd
      .map(tuple => (tuple._1, Seq(tuple._2)))
      .reduceByKey((x, y) => (x ++ y).sorted(Ordering[V].reverse).take(k))
      .flatMap(tuple => tuple._2.map(v => (tuple._1, v)))
      .toDF("key", "value")
      .as[(K, V)]
  }

在运行此我收到以下错误消息:

Error:(43, 8) value toDF is not a member of org.apache.spark.rdd.RDD[(K, V)]
possible cause: maybe a semicolon is missing before `value toDF'?
      .toDF("key", "value")

谁能帮我理解这里出了什么问题?

标签: scalaapache-spark

解决方案


有多种方法可以做到这一点(分组、分区、迭代分区),但只有在你喜欢自定义分区时才应该使用 RDD,对于其他任何你应该使用 Dataframe 或 Datasets 的情况。

我将使用 Dataframes 提供 Python 版本。对于带有数据集的 Scala 来说,这应该是一个很好的开始示例(API 相同)。

def topKByColumn(df, group_column, ordering_column, k):
    window = Window.partitionBy(df[group_column]).orderBy(df[ordering_column].desc())
    top_k = df.withColumn('rank', row_number().over(window))
    top_k = top_per[top_k.rank <= k]
    return top_k

推荐阅读