首页 > 解决方案 > 在不使用 collect 的情况下有效地迭代 spark 数据帧

问题描述

我有一个包含 2000 万条记录的巨大数据框。我需要迭代数据框 df1 并逐行读取每一行,并根据 df1 的列值构造另外两个数据框 df2 和 df3 作为输出。

输入 - df1 有 20 列和 2000 万条记录。输出 -df2 有 4 列,将根据 df1 中的列值创建 2000 万条记录。输出 - df3 有 20 列,将根据 df1 中的列值创建 500-8 亿条记录。

目前的做法——

 //EMI Transaction Schema
 val emiDFSchema = StructType(
 StructField("AS_OF_DATE",StringType, false)::
 StructField("EST_END_DT",StringType, false)::
 StructField("EFF_DT",StringType, false)::
 StructField("NET_PRCPL_AMT",DecimalType, false)::
 ...
 ...
Nil)

 val emiDFSchema1 = StructType(
 StructField("AS_OF_DATE",StringType, false)::
 StructField("EST_END_DT",StringType, false)::
 StructField("EFF_DT",StringType, false)::
 StructField("NET_PRCPL_AMT",DecimalType, false)::
 ...
 ...
Nil)

val rwList = new ListBuffer[Row]()
val rwList1 = new ListBuffer[Row]()

df1.collect().map{row=>
       ..... 
        //calculation of attributes based on columns of df1
       ......
        //creating Row object for df2 with all calculated attributes
        val row = Row(attr1,attr2,....attr20)
        rwList+=(row)

        for(i<-1 to n){
          ...
          //calculation of attributes based on columns of df1
          ...
          //creating Row object for df3 with all calculated attributes
          val row1 = Row(attr1,attr2,....attr20)
          rwList1+=(row1)
         }
      }

     val emiDF1 = spark.createDataFrame(spark.sparkContext.parallelize(rwList),emiSchema)
     val emiDF2 = spark.createDataFrame(spark.sparkContext.parallelize(rwList1),emiSchema1)

由于 df1 很大,因此在其上执行 collect().map 会花费大量时间。您能否建议一种在更短的时间内有效地迭代 df1 的替代方法?

---Spark v2.4 ---Scala

标签: scalaapache-spark-sqlbigdata

解决方案


val df11 = df1.selectExpr(...)

数据框 selectExpr ...selectExpr 情况下! selectExpr + 参数


推荐阅读