首页 > 解决方案 > 转换为 RDD 失败

问题描述

我的代码如下。我读了一个包含两列的 CSV 文件。通过转换为 RDD 循环遍历 Dataframe 的元素。现在我想为每个元素创建一个 DF。下面的代码失败。任何人都可以请帮忙。

    val df1 = spark.read.format("csv").load("c:\\file.csv") //CSV has 3 columns
     
    for (row <- df1.rdd.collect)
     {
       var tab1 =  row.mkString(",").split(",")(0) //Has Tablename
       var tab2 =  row.mkString(",").split(",")(1) //One Select Statment
       var tab3 =  row.mkString(",").split(",")(1) //Another Select Statment      
       
       val newdf = spark.createDataFrame(tab1).toDF("Col") // This is not working
               
     }
    

我想将 tab2 数据框与 tab3 连接并附加表名。例如

在 tab2 和 tab3 中执行查询给出以下结果。

Col1     col2
---      ---
A         B
C         D
E         F
G         H

我想要如下:

Col0  Col1  Col2
----  ----   ---
Tab1   A      B
Tab1   C      D
Tab2   E      F
Tab3   G      h 

现在 tab1 tab2 tab2.. 等这些信息在正在读取的 CSV 文件中。我想将该 col0 转换为数据帧,以便我可以在 Spark Sql 中读取

标签: scalaentity-frameworkapache-spark

解决方案


我能够在下面解决我的替换问题:

val newdf = spark.createDataFrame(tab1).toDF("Col") // This is not working

经过

val newDf = spark.sparkContext.parallelize(Seq(newdf)).toDF("Col")

推荐阅读