apache-spark - 在 spark 数据框中连续应用更改
问题描述
我有一个初始状态名为 init 的数据框。我有一个具有相同架构的数据框,其中每行更新一个数据框初始化字段,其他字段为 Null。如何连续应用更改重建每条记录?为了更清楚,让我们举个例子:
listOfTuples = [(101, "Status_0", '2019','value_col_4',0)]
init = spark.createDataFrame(listOfTuples , ["id", "status", "year","col_4","ord"])
#initial state
>>> init.show()
+---+--------+----+-----------+---+
| id| status|year| col_4|ord|
+---+--------+----+-----------+---+
| 1|Status_0|2019|value_col_4| 0|
+---+--------+----+-----------+---+
#dataframe with changes
schema = StructType([StructField('id', StringType(), True),
StructField('status', StringType(), True),
StructField('year', StringType(), True),
StructField('col_4', StringType(), True),
StructField('ord', IntegerType(), True)])
listOfTuples = [(1, "Status_A", None, None,1),
(1, "Status_B", None, None,2),
(1, None, None, "new_val", 3),
(1, "Status_C", None, None,4)]
changes = spark.createDataFrame(listOfTuples , schema)
>>> changes.show()
+---+--------+----+-------+---+
| id| status|year| col_4|ord|
+---+--------+----+-------+---+
| 1|Status_A|null| null| 1|
| 1|Status_B|null| null| 2|
| 1| null|null|new_val| 3|
| 1|Status_C|null| null| 4|
+---+--------+----+-------+---+
我希望这些更改以 ord 列的顺序连续应用于最终数据帧,并将数据帧 init 中的值作为基线。所以我希望我的最终数据框是这样的:
>>> final.show()
+---+--------+----+--------------+
| id| status|year| col_4 |
+---+--------+----+--------------+
| 1|Status_0|2019| value_col_4 |
| 1|Status_A|2019| value_col_4 |
| 1|Status_B|2019| value_col_4 |
| 1|Status_B|2019| new_val |
| 1|Status_C|2019| new_val |
+---+--------+----+--------------+
我正在考虑合并两个按 ord 列排序的数据框,然后以某种方式在下面传播更改。有谁知道如何做到这一点?
解决方案
这是 Scala 代码,但我希望这会有所帮助。最后,您可以删除或重命名列。解决方案是执行一个union
然后在所有 3 列的行org.apache.spark.sql.functions.last
框架中获取非空值。unboundedpreceding
currentrow
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.WindowSpec
import org.apache.spark.sql.functions._
scala> initial.show
+---+--------+----+-----------+---+
| id| status|year| col_4|ord|
+---+--------+----+-----------+---+
| 1|Status_0|2019|value_col_4| 0|
+---+--------+----+-----------+---+
scala> changes.show
+---+--------+----+-------+---+
| id| status|year| col_4|ord|
+---+--------+----+-------+---+
| 1|Status_A|null| null| 1|
| 1|Status_B|null| null| 2|
| 1| null|null|new_val| 3|
| 1|Status_C|null| null| 4|
+---+--------+----+-------+---+
scala> val inter = initial.union(changes)
inter: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: string, status: string ... 3 more fields]
scala> inter.show
+---+--------+----+-----------+---+
| id| status|year| col_4|ord|
+---+--------+----+-----------+---+
| 1|Status_0|2019|value_col_4| 0|
| 1|Status_A|null| null| 1|
| 1|Status_B|null| null| 2|
| 1| null|null| new_val| 3|
| 1|Status_C|null| null| 4|
+---+--------+----+-----------+---+
scala> val overColumns = Window.partitionBy("id").orderBy("ord").rowsBetween(Window.unboundedPreceding, Window.currentRow)
overColumns: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@70f4b378
scala> val output = inter.withColumn("newstatus",
last("status", true).over(overColumns)).withColumn("newyear",
last("year", true).over(overColumns)).withColumn("newcol_4",
last("col_4", true).over(overColumns))
output: org.apache.spark.sql.DataFrame = [id: string, status: string ... 6 more fields]
scala> output.show(false)
+---+--------+----+-----------+---+---------+-------+-----------+
|id |status |year|col_4 |ord|newstatus|newyear|newcol_4 |
+---+--------+----+-----------+---+---------+-------+-----------+
|1 |Status_0|2019|value_col_4|0 |Status_0 |2019 |value_col_4|
|1 |Status_A|null|null |1 |Status_A |2019 |value_col_4|
|1 |Status_B|null|null |2 |Status_B |2019 |value_col_4|
|1 |null |null|new_val |3 |Status_B |2019 |new_val |
|1 |Status_C|null|null |4 |Status_C |2019 |new_val |
+---+--------+----+-----------+---+---------+-------+-----------+
推荐阅读
- java - 当我在视图中进行更改时,如何防止它执行所有这些操作?
- firebase - 我没有看到 Allprojects 存储库
- amazon-web-services - 如何在 AWS 中创建预算警报
- sorting - AutoSort Google 工作表,包括按自定义顺序排列的下拉菜单
- redirect - 如何在 haproxy 中制作重定向 uri?
- python - 使用 SimPy 的 AnyOf 方法时如何知道正在使用哪个资源?
- sap-basis - SAP Webdynpro 应用程序导致恢复时出现会话错误
- android - 如果使用 maxLines、textAppearanceBody1 和重力结束,文本会被剪切
- android - 使用 Bitbucket 发布 Android Gradle 库(Atifact)
- tensorflow - 从 TF 模型 Zoo 导入模型并进行训练