scala - 在函数 Spark-Scala 中迭代计算
问题描述
我设计了一个对不同列进行一系列相等计算的函数。功能是:
import org.apache.spark.sql.DataFrame
def calculoVariables(table: DataFrame, variable: String): DataFrame = {
val D_1 = "pl_"
val D = "pos_"
val fxRate = "fxtoeur"
val Acumulado = variable + "_ac"
val varPositionD = D + variable
val AcumuladoD_1 = D_1 + variable + "_ac"
val LocalAvgAc= variable + "_localavg_ac"
val LocalAvgAcD_1 = D_1 + variable + "_localavg_ac"
table.withColumn(Acumulado, when( ($"Tipo" === "Normal"), col(varPositionD))
.when( ($"Tipo" === "Nueva"), col(varPositionD))
.when( ($"Tipo" === "Cancelada"), 0.0)
.otherwise( col(AcumuladoD_1)))
.withColumn(LocalAvgAc, when( ($"Tipo" === "Normal"), (col(LocalAvgAcD_1) + ((col(Acumulado) - col(AcumuladoD_1)) * col(fxRate))))
.when( ($"Tipo" === "Cancelada"), 0.0)
.otherwise( (col(LocalAvgAcD_1) + ((col(Acumulado) - col(AcumuladoD_1)) * col(fxRate) ))))
}
我希望您对单个 DataFrame 执行此操作,但对不同字段执行此操作,例如列表中的不同字段(“sales”、“prize”、“telephone”)。
这个指示值的函数工作得很好,但我不能让它迭代地做。
太感谢了!
解决方案
推荐阅读
- cython - 如何通过 cython 提供对 cpp 对象的绑定?
- php - Laravel 加入购物车问题
- python - Microsoft Teams:向用户发布直接消息以响应频道中的消息
- python - 如何使用 str.contains 函数使用行的索引替换单元格值
- python - 如何使用 .schema 显示 Django 创建的表?
- javascript - 循环通过自定义反应组件并向它们添加类名
- c# - Blazor Server 应用 + 外部登录 Facebook:用户名已被占用
- javascript - 类构造函数没有被调用
- javascript - 当主体更改为 DOM 元素时,循环无法正常工作
- python - Python enumerate 没有绘制正确的 matplotlib 数字?