scala - 如何在 withColumn 中重用表达式
问题描述
我想按距离过滤经纬度坐标。我有以下表达。
.withColumn("a",
// Haversine distance
pow(sin(toRadians($"b_dropoff_latitude" - $"a_pickup_latitude") / 2), 2) +
cos(toRadians($"a_pickup_latitude")) * cos(toRadians($"b_dropoff_latitude")) *
pow(sin(toRadians($"b_dropoff_latitude" - $"a_pickup_latitude") / 2), 2))
.withColumn("diff_b_dropoff_a_pickup", atan2(sqrt($"a"), sqrt(-$"a" + 1)) * 2 * 6371)
.drop("a")
.filter($"diff_b_pickup_a_dropoff" < lit(max_dist))
现在,我创建一个列,以进一步将该列用于下一个withColumn
表达式,只是删除第一个表达式并过滤第二个表达式。换句话说,我触摸了 4 次东西,我确信我可以更简单地做到这一点。我该如何重写这个?我现在遇到的主要问题是我不能(我可以,但是我计算"a"
了两次)计算第二个withColumn
,因为我需要第一个两次..
是否可以重用/或声明一个变量,以便我可以使用第二个withColumn
作为我的过滤条件?
谢谢!
解决方案
该函数的有趣之处之一withColumn
是它可以采用org.apache.spark.sql.Column
导致 spark sqlColumn
类型的表达式。因此,您实际上可以将多个withColumn
函数转换为不同的表达式Columns
。换句话说,可以组合多个列来制作更复杂的列表达式。
让我们看一个对数据框中的列进行平方然后加倍的简单示例:
// here is a piece of code which squares and doubles col_2 in (some) dataframe
// with an integer col_2, similar to your code.
val square = df.withColumn("square", $"col_2" * $"col_2")
val squareAndDouble = square.withColumn("square_and_double", $"square" * 2)
val res1 = squareAndDouble.drop("square")
res1.show()
+-----+-----+-----------------+
|col_1|col_2|square_and_double|
+-----+-----+-----------------+
| 1| 1| 2|
| 2| 2| 8|
这也可以使用列表达式来完成。这是因为该withColumn
函数采用导致 Column 类型的列表达式(如上所述)。
// Here's the signature of the withColumn function in spark-scala.
def withColumn(colName: String, col: Column): DataFrame
所以我们可以编写我们的 square 和 double 代码如下:
// Again squaring and doubling col_2, but this time using column expressions.
val sqCol = $"col_2" * $"col_2"
val sqAndDoubleCol = sqCol * 2
val res2 = df.withColumn("square_and_double", sqAndDoubleCol)
res2.show()
+-----+-----+-----------------+
|col_1|col_2|square_and_double|
+-----+-----+-----------------+
| 1| 1| 2|
| 2| 2| 8|
由于您的代码实际上并未使用转换为列的文字(火花列类型的点亮函数),我猜测计算半正弦距离的表达式正在返回一个org.apache.spark.sql.Column
类型。因此,我们可以重写上面的代码,如下所示:
val haversineDistance : org.apache.spark.sql.Column =
pow(sin(toRadians($"b_dropoff_latitude" - $"a_pickup_latitude") / 2), 2) +
cos(toRadians($"a_pickup_latitude")) * cos(toRadians($"b_dropoff_latitude")) *
pow(sin(toRadians($"b_dropoff_latitude" - $"a_pickup_latitude") / 2), 2)
val dropoffDistance : org.apache.spark.sql.Column = atan2(sqrt(haversineDistance), sqrt(-haversineDistance + 1)) * 2 * 6371)
df.withColumn(“dropoffDistance”, dropoffDistance)
.filter(…)
:
:
注1:如果直接使用上面的代码,请您检查一下。我对实际的领域逻辑和计算并不精通。我只是想完成我的答案并展示火花规模代码的使用。
注 2:在这种情况下,实际的火花物理计划不应改变。不过,您可以在解析和优化中节省一些时间,因为您明确告诉 spark 要计算什么。因此,我不认为这是优化此代码以提高性能的一种方式。但是,如果您重新使用半正弦距离,它将有助于重用和可读性。
注意 3:无法检查列名以及它们是否实际上属于数据框(即使在编译时)。这是此 API 的一个缺点。列名可以是任何字符串。在您实际运行它之前,它不会检查是否将其与数据框相关联。spark api文档中也提到了这一点:
col("columnName") // A generic column no yet associated with a DataFrame.
所以请注意这一点并仔细检查列名。如果可以,请在将其投入生产之前运行一些测试。
推荐阅读
- c - 如何禁止其他 Linux 内核模块访问某些常规文件?
- python-3.x - Python3 无法识别我的文件夹路径,即使它与终端显示的内容匹配
- c++ - 虚函数“方法”的返回类型与它覆盖的函数的返回类型不协变 c++
- excel - Phpspreadsheet:下载不适用于 .csv 和 .pdf 文件
- node.js - 执行 docker run 命令时如何将 env 变量传递给 json 文件
- c# - 更新的递归函数
- mysql - Spring Boot:如何使用谓词创建涉及“加入”和“分组依据”的动态查询
- c - 使用带有 -g 标志的 gcc 在 c 中编译内联汇编时,编译器抛出“缺少输出操作数约束”
- django - 在 StructBlock 中使用 MultiFieldPanel
- excel - 在下一个工作表循环中,SpecialCells(xlCellTypeVisible) 范围应读取为 NOTHING