首页 > 解决方案 > 在函数 Spark-Scala 中迭代计算

问题描述

我设计了一个对不同列进行一系列相等计算的函数。功能是:

import org.apache.spark.sql.DataFrame

def calculoVariables(table: DataFrame, variable: String): DataFrame = {
            
        val D_1 = "pl_"
        val D = "pos_" 
        val fxRate = "fxtoeur"
        val Acumulado = variable + "_ac"
        val varPositionD = D + variable
        val AcumuladoD_1 = D_1 + variable + "_ac"
        val LocalAvgAc= variable + "_localavg_ac"
        val LocalAvgAcD_1 = D_1 + variable + "_localavg_ac"
        
table.withColumn(Acumulado, when( ($"Tipo" === "Normal"), col(varPositionD))
                           .when( ($"Tipo" === "Nueva"),  col(varPositionD))
                           .when( ($"Tipo" === "Cancelada"), 0.0)
                           .otherwise( col(AcumuladoD_1)))
     .withColumn(LocalAvgAc, when( ($"Tipo" === "Normal"), (col(LocalAvgAcD_1) + ((col(Acumulado) - col(AcumuladoD_1)) * col(fxRate))))
                            .when( ($"Tipo" === "Cancelada"), 0.0)
                            .otherwise( (col(LocalAvgAcD_1) + ((col(Acumulado) - col(AcumuladoD_1)) * col(fxRate)  ))))                                  
      }

我希望您对单个 DataFrame 执行此操作,但对不同字段执行此操作,例如列表中的不同字段(“sales”、“prize”、“telephone”)。

这个指示值的函数工作得很好,但我不能让它迭代地做。

太感谢了!

标签: scalaapache-spark

解决方案


推荐阅读