scala - Spark Dataframe Scala:通过某些条件添加新列
问题描述
我修改了我的问题,以便更容易理解。
原始 df 如下所示:
+---+----------+-------+----+------+
| id|tim |price | qty|qtyChg|
+---+----------+-------+----+------+
| 1| 31951.509| 0.370| 1| 1|
| 2| 31951.515|145.380| 100| 100|
| 3| 31951.519|149.370| 100| 100|
| 4| 31951.520|144.370| 100| 100|
| 5| 31951.520|119.370| 5| 5|
| 6| 31951.520|149.370| 300| 200|
| 7| 31951.521|149.370| 400| 100|
| 8| 31951.522|149.370| 410| 10|
| 9| 31951.522|149.870| 50| 50|
| 10| 31951.522|109.370| 50| 50|
| 11| 31951.522|144.370| 400| 300|
| 12| 31951.524|149.370| 610| 200|
| 13| 31951.526|135.130| 22| 22|
| 14| 31951.527|149.370| 750| 140|
| 15| 31951.528| 89.370| 100| 100|
| 16| 31951.528|145.870| 50| 50|
| 17| 31951.528|139.370| 100| 100|
| 18| 31951.531|144.370| 410| 10|
| 19| 31951.531|149.370| 769| 19|
| 20| 31951.538|149.370| 869| 100|
| 21| 31951.538|144.880| 200| 200|
| 22| 31951.541|139.370| 221| 121|
| 23| 31951.542|149.370|1199| 330|
| 24| 31951.542|139.370| 236| 15|
| 25| 31951.542|144.370| 510| 100|
| 26| 31951.543|146.250| 50| 50|
| 27| 31951.543|143.820| 100| 100|
| 28| 31951.543|139.370| 381| 145|
| 29| 31951.544|149.370|1266| 67|
| 30| 31951.544|150.000| 50| 50|
| 31| 31951.544|137.870| 300| 300|
| 32| 31951.544|140.470| 10| 10|
| 33| 31951.545|150.000| 53| 3|
| 34| 31951.545|140.000| 25| 25|
| 35| 31951.545|148.310| 8| 8|
| 36| 31951.547|149.000| 20| 20|
| 37| 31951.549|143.820| 102| 2|
| 38| 31951.549|150.110| 75| 75|
+---+----------+-------+----+------+
然后我运行代码
val ww = Window.partitionBy().orderBy($"tim")
val step1 = df.withColumn("sequence",sort_array(collect_set(col("price")).over(ww),asc=false))
.withColumn("top1price",col("sequence").getItem(0))
.withColumn("top2price",col("sequence").getItem(1))
.drop("sequence")
新的数据框如下所示:
+---+---------+-------+----+------+---------+---------+
| id| tim| price| qty|qtyChg|top1price|top2price|
+---+---------+-------+----+------+---------+---------+
| 1|31951.509| 0.370| 1| 1| 0.370| null|
| 2|31951.515|145.380| 100| 100| 145.380| 0.370|
| 3|31951.519|149.370| 100| 100| 149.370| 145.380|
| 4|31951.520|149.370| 300| 200| 149.370| 145.380|
| 5|31951.520|144.370| 100| 100| 149.370| 145.380|
| 6|31951.520|119.370| 5| 5| 149.370| 145.380|
| 7|31951.521|149.370| 400| 100| 149.370| 145.380|
| 8|31951.522|109.370| 50| 50| 149.870| 149.370|
| 9|31951.522|144.370| 400| 300| 149.870| 149.370|
| 10|31951.522|149.870| 50| 50| 149.870| 149.370|
| 11|31951.522|149.370| 410| 10| 149.870| 149.370|
| 12|31951.524|149.370| 610| 200| 149.870| 149.370|
| 13|31951.526|135.130| 22| 22| 149.870| 149.370|
| 14|31951.527|149.370| 750| 140| 149.870| 149.370|
| 15|31951.528| 89.370| 100| 100| 149.870| 149.370|
| 16|31951.528|139.370| 100| 100| 149.870| 149.370|
| 17|31951.528|145.870| 50| 50| 149.870| 149.370|
| 18|31951.531|144.370| 410| 10| 149.870| 149.370|
| 19|31951.531|149.370| 769| 19| 149.870| 149.370|
| 20|31951.538|144.880| 200| 200| 149.870| 149.370|
| 21|31951.538|149.370| 869| 100| 149.870| 149.370|
| 22|31951.541|139.370| 221| 121| 149.870| 149.370|
| 23|31951.542|144.370| 510| 100| 149.870| 149.370|
| 24|31951.542|139.370| 236| 15| 149.870| 149.370|
| 25|31951.542|149.370|1199| 330| 149.870| 149.370|
| 26|31951.543|139.370| 381| 145| 149.870| 149.370|
| 27|31951.543|143.820| 100| 100| 149.870| 149.370|
| 28|31951.543|146.250| 50| 50| 149.870| 149.370|
| 29|31951.544|140.470| 10| 10| 150.000| 149.870|
| 30|31951.544|137.870| 300| 300| 150.000| 149.870|
| 31|31951.544|150.000| 50| 50| 150.000| 149.870|
| 32|31951.544|149.370|1266| 67| 150.000| 149.870|
| 33|31951.545|140.000| 25| 25| 150.000| 149.870|
| 34|31951.545|150.000| 53| 3| 150.000| 149.870|
| 35|31951.545|148.310| 8| 8| 150.000| 149.870|
| 36|31951.547|149.000| 20| 20| 150.000| 149.870|
| 37|31951.549|150.110| 75| 75| 150.110| 150.000|
| 38|31951.549|143.820| 102| 2| 150.110| 150.000|
+---+---------+-------+----+------+---------+---------+
我希望得到两个新列 top1priceQty、top2priceQty,它们存储了 top1price 和 top2price 的最新对应数量。
例如,在第 6 行,top1price=149.370,根据这个值,我想得到它对应的数量是 400(不是 100 或 300)。在第 33 行中,当 top1price=150.00000000 时,我想获得其对应的数量,即来自第 32 行的 53,而不是来自第 28 行的 50。同样的规则适用于 top2price
谢谢大家!
解决方案
您自己非常接近答案。而不是只收集一列的集合,而是收集“LMTPRICE”数组及其对应的“数量”。然后对 top1price 使用 getItem(0).getItem(0),对 top1priceQty 使用 getItem(0).getItem(1)。要在 INTEREST_TIME 之前保持订单以获得正确的数量,请在 'LMTPRICE' 之后和 'qty' 之前使用 INTEREST_TIME。
df.withColumn("sequence",sort_array(collect_set(array("LMTPRICE","INTEREST_TIME","qty")).over(ww),asc=false)).withColumn("top1price",col("sequence").getItem(0).getItem(0)).withColumn("top1priceQty",col("sequence").getItem(0).getItem(2).cast("int")).drop("sequence").show(false)
+-----+-------------+--------+---+------+---------+------------+
|index|INTEREST_TIME|LMTPRICE|qty|qtyChg|top1price|top1priceQty|
+-----+-------------+--------+---+------+---------+------------+
|0 |31951.509 |0.37 |1 |1 |0.37 |1 |
|1 |31951.515 |145.38 |100|100 |145.38 |100 |
|2 |31951.519 |149.37 |100|100 |149.37 |100 |
|3 |31951.52 |119.37 |5 |5 |149.37 |300 |
|4 |31951.52 |144.37 |100|100 |149.37 |300 |
|5 |31951.52 |149.37 |300|200 |149.37 |300 |
|6 |31951.521 |149.37 |400|100 |149.37 |400 |
|7 |31951.522 |149.87 |50 |50 |149.87 |50 |
|8 |31951.522 |149.37 |410|10 |149.87 |50 |
|9 |31951.522 |109.37 |50 |50 |149.87 |50 |
|10 |31951.522 |144.37 |400|300 |149.87 |50 |
|11 |31951.524 |149.87 |610|200 |149.87 |610 |
|12 |31951.526 |135.13 |22 |22 |149.87 |610 |
|13 |31951.527 |149.37 |750|140 |149.87 |610 |
|14 |31951.528 |139.37 |100|100 |149.87 |610 |
|15 |31951.528 |145.87 |50 |50 |149.87 |610 |
|16 |31951.528 |89.37 |100|100 |149.87 |610 |
|17 |31951.531 |144.37 |410|10 |149.87 |610 |
|18 |31951.531 |149.37 |769|19 |149.87 |610 |
|19 |31951.538 |149.37 |869|100 |149.87 |610 |
+-----+-------------+--------+---+------+---------+------------+
推荐阅读
- google-play-services - 我应该如何处理添加新的增量成就
- php - 从 Postgresql 计算 PHP 查询
- html - 如何在 flex 框中使内容可滚动,以便另一个框保持恒定宽度
- arm - 如何在 Bazel 中配置工具链
- angular - 如何从订阅另一个函数的可观察对象的函数返回布尔值?
- c# - 在文本文件中的每一行的第 n 个位置插入字符
- mysql - 有人可以向我解释这个mysql代码吗
- javascript - 分配器分配新值的问题
- regex - 正则表达式引擎没有全局过滤器并且不接受标志
- node.js - 映射地理点在 ElasticSearch 中不起作用