首页 > 解决方案 > 如何将 org.apache.spark.sql.DataFrame 转换为 org.apache.spark.rdd.RDD[Double]?

问题描述

我正在尝试将这个想法https://fullstackml.com/how-to-check-hypotheses-with-bootstrap-and-apache-spark-cd750775286a应用于我拥有的数据框。我正在使用的代码是这部分:

import scala.util.Sorting.quickSort

def getConfInterval(input: org.apache.spark.rdd.RDD[Double], N: Int, left: Double, right:Double)
            : (Double, Double) = {
    // Simulate by sampling and calculating averages for each of subsamples
    val hist = Array.fill(N){0.0}
    for (i <- 0 to N-1) {
        hist(i) = input.sample(withReplacement = true, fraction = 1.0).mean
    }

    // Sort the averages and calculate quantiles
    quickSort(hist)
    val left_quantile  = hist((N*left).toInt)
    val right_quantile = hist((N*right).toInt)
    return (left_quantile, right_quantile)
}

这运行正常,但是当我尝试将其应用于:

val data = mydf.map( _.toDouble )

val (left_qt, right_qt) = getConfInterval(data, 1000, 0.025, 0.975)

val H0_mean = 30
if (left_qt < H0_mean && H0_mean < right_qt) {
    println("We failed to reject H0. It seems like H0 is correct.")
} else {
    println("We rejected H0")
}

我收到一个错误

错误:值 toDouble 不是 org.apache.spark.sql.Row val data = dfTeste.map( _.toDouble ) 的成员

当我没有

.map( _.toDouble

我得到:

笔记本:4:错误:类型不匹配;找到:org.apache.spark.sql.DataFrame(扩展为)org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] 需要:org.apache.spark.rdd.RDD[Double]

mydf 基本上是一个数据框,我只选择了一列(类型为 double,几行 0.0 或 1.0)

当我做:

dfTeste.map(x=>x.toString()).rdd

它成功地转向 org.apache.spark.rdd.RDD[String] 但我找不到为 Double 执行此操作的方法。我对此很陌生,所以如果没有多大意义,我深表歉意。

标签: apache-sparkapache-spark-sql

解决方案


显然val data = mydf.map( _.toDouble )不是一个RDD[Double]而是一个DataFrame

在您链接的示例中,他们使用

val data = dataWithHeader.filter( _ != header ).map( _.toDouble )

这是一个RDD[Double]sc.textFile返回一个RDD)

所以你需要转换mydf为 RDD,你可以这样做:

val data = mydf.map(r => r.getDouble(0)).rdd

推荐阅读