首页 > 解决方案 > SparkSQL 基于表达式创建一个新列

问题描述

我有一个名为 ipTraffic 的数据框,其架构如下:

ipTraffic: org.apache.spark.sql.DataFrame = [ip: string, record_count: double]

我正在尝试创建一个新列,该列取列的最大值 "record_count"并除以该行的记录计数值。

我跑了:

val calc = ipTraffic.agg(max("record_count")) / (ipTraffic("record_count"))
ipTraffic = ipTraffic.withColumn("weight", expr(calc))

val calc = ipTraffic.agg(max("record_count")).divide(ipTraffic("record_count"))
ipTraffic = ipTraffic.withColumn("weight", expr(calc))`

并得到一个错误

error: value / is not a member of org.apache.spark.sql.DataFrame

这对我来说没有意义,因为除法肯定是火花(显然),但我去了https://spark.apache.org/docs/2.3.0/api/sql/无论如何都找到了它并且“/”是包括。

标签: apache-sparkapache-spark-sql

解决方案


您尝试将数据框与列分开:

ipTraffic.agg(max("record_count")):

+-----------------+
|max(record_count)| 
+-----------------+
|              3.0|
+-----------------+ 

除以:

ipTraffic("record_count"):
+------------+
|record_count|
+------------+
|         1.0|
|         2.0|
|         3.0|
|         1.0|
|         2.0|
|         3.0|
+------------+

相反,您可以先计算最大值,将其作为文字值获取,然后在计算中使用它:

import spark.implicits._     
val maxRecordCount = ipTraffic.agg(max($"record_count")).first.getDouble(0)
val ipTrafficWithWeight = ipTraffic.withColumn("weight", lit(maxRecordCount) / $"record_count")

推荐阅读