首页 > 解决方案 > 块不能包含声明

问题描述

我是 Scala 的新手,并试图创建 UDF 函数,该函数将返回由我的 UDF 函数标识的串联元组字符串。

它应该看起来像这样,但这里有几个问题,它不喜欢在块之外定义变量“fine”,也不想改变作为参数传递的数量。

val calculateFines: UserDefinedFunction = udf((ids: Array[Long], values: Array[Double], amount: Double, complain_id: Long) => {
       var fines
       ids.indices foreach { i => {
         val (id, value) = (ids(i), values(i))
         val penalty = if (value > amount)  amount else  value
         amount =  amount - penalty
         fines = fines + (amount, id, complain_id, penalty).toString()
         if (amount <= 0) 
           break
        }
       }
       return fines
     })

标签: scalaapache-sparkuser-defined-functions

解决方案


您可以通过一些修复使您的代码工作:

import scala.util.control.Breaks._ //we need this import to allow breaks since Scala doesn't support them out-of-box

val dysfunctional = udf((ids: Array[Long], values: Array[Double], amount: Double, complain_id: Long) => {
    var fines: String = "" //you need to initalize var
    var amountSum = amount //assign amount to var to allow to reassigment

    breakable {
      ids.indices foreach { i =>
        {
          val (id, value) = (ids(i), values(i))
          val penalty = if (value > amount) amount else value
          amountSum = amountSum - penalty
          fines = fines + (amount, id, complain_id, penalty)
          if (amount <= 0)
            break
        }
      }
    }
    fines
  })

这会起作用,但是很多人会不赞成它,因为它是非常非功能性的方法,而 Scala 鼓励编写函数式代码。您可能会尝试将其更改为以下内容:


val moreFunctional = (ids: Array[Long], values: Array[Double], amount: Double, complain_id: Long) => {

    val (_, fines) = (ids, values)
      .zipped // zip values and ids to single array of tuples
      .toStream //change it to stream to allow lazy computation
      .scanLeft((amount, "")) { //we pass tuple of amount and empty string as our initial state to scanLeft
        case ((amount, fines), (id, value)) => //second argument of scanLeft is function which receives previous state and currently processed element of array
          val penalty = if (value > amount) amount else value
          (amount, fines + (amount, id, complain_id, penalty).toString()) //here we passs next state for next iteration of scanLeft
      }
      .takeWhile { //we proceed with computations as long and amount is above zero
        case (amount, _) => amount > 0
      }
      .last //we're only interested in last state produced by scan left

    fines
  }

推荐阅读