apache-spark - SparkSQL 基于表达式创建一个新列
问题描述
我有一个名为 ipTraffic 的数据框,其架构如下:
ipTraffic: org.apache.spark.sql.DataFrame = [ip: string, record_count: double]
我正在尝试创建一个新列,该列取列的最大值
"record_count"
并除以该行的记录计数值。
我跑了:
val calc = ipTraffic.agg(max("record_count")) / (ipTraffic("record_count"))
ipTraffic = ipTraffic.withColumn("weight", expr(calc))
和
val calc = ipTraffic.agg(max("record_count")).divide(ipTraffic("record_count"))
ipTraffic = ipTraffic.withColumn("weight", expr(calc))`
并得到一个错误
error: value / is not a member of org.apache.spark.sql.DataFrame
这对我来说没有意义,因为除法肯定是火花(显然),但我去了https://spark.apache.org/docs/2.3.0/api/sql/无论如何都找到了它并且“/”是包括。
解决方案
您尝试将数据框与列分开:
ipTraffic.agg(max("record_count")):
+-----------------+
|max(record_count)|
+-----------------+
| 3.0|
+-----------------+
除以:
ipTraffic("record_count"):
+------------+
|record_count|
+------------+
| 1.0|
| 2.0|
| 3.0|
| 1.0|
| 2.0|
| 3.0|
+------------+
相反,您可以先计算最大值,将其作为文字值获取,然后在计算中使用它:
import spark.implicits._
val maxRecordCount = ipTraffic.agg(max($"record_count")).first.getDouble(0)
val ipTrafficWithWeight = ipTraffic.withColumn("weight", lit(maxRecordCount) / $"record_count")
推荐阅读
- android - 在 Flutter 项目中获取 iOS 和 Android 的 AdMob 测试设备 ID
- php - Laravel 大文件上传不起作用,我收到待处理的请求
- java - Cannot understand how to implement Memoization of a function with Guava
- r - R - 按月过滤数据
- python - 在python中的某个索引处更改第二个列表中的元素
- c# - C#多线程出队列表
- c++ - C ++:静态成员函数为具有私有构造函数的类返回自静态对象
- python - Python Pandas:尝试在 date_range 操作中加快每个日期的每行
- python - 如何使用 tf.keras 模型从 keras 模型复制结果(或找到差异)?
- java - 使用 CompletableFuture 的并发数据库调用