首页 > 解决方案 > 如何在 withColumn 中重用表达式

问题描述

我想按距离过滤经纬度坐标。我有以下表达。

.withColumn("a",
        // Haversine distance
        pow(sin(toRadians($"b_dropoff_latitude" - $"a_pickup_latitude") / 2), 2) +
          cos(toRadians($"a_pickup_latitude")) * cos(toRadians($"b_dropoff_latitude")) *
            pow(sin(toRadians($"b_dropoff_latitude" - $"a_pickup_latitude") / 2), 2))
      .withColumn("diff_b_dropoff_a_pickup", atan2(sqrt($"a"), sqrt(-$"a" + 1)) * 2 * 6371)
      .drop("a")
      .filter($"diff_b_pickup_a_dropoff" < lit(max_dist))

现在,我创建一个列,以进一步将该列用于下一个withColumn表达式,只是删除第一个表达式并过滤第二个表达式。换句话说,我触摸了 4 次东西,我确信我可以更简单地做到这一点。我该如何重写这个?我现在遇到的主要问题是我不能(我可以,但是我计算"a"了两次)计算第二个withColumn,因为我需要第一个两次..

是否可以重用/或声明一个变量,以便我可以使用第二个withColumn作为我的过滤条件?

谢谢!

标签: scalaapache-spark-sql

解决方案


该函数的有趣之处之一withColumn是它可以采用org.apache.spark.sql.Column导致 spark sqlColumn类型的表达式。因此,您实际上可以将多个withColumn函数转换为不同的表达式Columns。换句话说,可以组合多个列来制作更复杂的列表达式。

让我们看一个对数据框中的列进行平方然后加倍的简单示例:

// here is a piece of code which squares and doubles col_2 in (some) dataframe
// with an integer col_2, similar to your code.

val square = df.withColumn("square", $"col_2" * $"col_2")
val squareAndDouble = square.withColumn("square_and_double", $"square" * 2)
val res1 = squareAndDouble.drop("square")
res1.show()

+-----+-----+-----------------+
|col_1|col_2|square_and_double|
+-----+-----+-----------------+
|    1|    1|                2|
|    2|    2|                8|

这也可以使用列表达式来完成。这是因为该withColumn函数采用导致 Column 类型的列表达式(如上所述)。

// Here's the signature of the withColumn function in spark-scala.
def withColumn(colName: String, col: Column): DataFrame

所以我们可以编写我们的 square 和 double 代码如下:

// Again squaring and doubling col_2, but this time using column expressions.

val sqCol = $"col_2" * $"col_2"
val sqAndDoubleCol = sqCol * 2
val res2 = df.withColumn("square_and_double", sqAndDoubleCol)

res2.show()
+-----+-----+-----------------+
|col_1|col_2|square_and_double|
+-----+-----+-----------------+
|    1|    1|                2|
|    2|    2|                8|

由于您的代码实际上并未使用转换为列的文字(火花列类型的点亮函数),我猜测计算半正弦距离的表达式正在返回一个org.apache.spark.sql.Column类型。因此,我们可以重写上面的代码,如下所示:

val haversineDistance : org.apache.spark.sql.Column = 
pow(sin(toRadians($"b_dropoff_latitude" - $"a_pickup_latitude") / 2), 2) +
          cos(toRadians($"a_pickup_latitude")) * cos(toRadians($"b_dropoff_latitude")) *
            pow(sin(toRadians($"b_dropoff_latitude" - $"a_pickup_latitude") / 2), 2)

val dropoffDistance : org.apache.spark.sql.Column = atan2(sqrt(haversineDistance), sqrt(-haversineDistance + 1)) * 2 * 6371)

df.withColumn(“dropoffDistance”, dropoffDistance)
  .filter(…)
  :
  :

注1:如果直接使用上面的代码,请您检查一下。我对实际的领域逻辑和计算并不精通。我只是想完成我的答案并展示火花规模代码的使用。

注 2:在这种情况下,实际的火花物理计划不应改变。不过,您可以在解析和优化中节省一些时间,因为您明确告诉 spark 要计算什么。因此,我不认为这是优化此代码以提高性能的一种方式。但是,如果您重新使用半正弦距离,它将有助于重用和可读性。

注意 3:无法检查列名以及它们是否实际上属于数据框(即使在编译时)。这是此 API 的一个缺点。列名可以是任何字符串。在您实际运行它之前,它不会检查是否将其与数据框相关联。spark api文档中也提到了这一点:

col("columnName")           // A generic column no yet associated with a DataFrame.

所以请注意这一点并仔细检查列名。如果可以,请在将其投入生产之前运行一些测试。


推荐阅读