首页 > 解决方案 > 使用 PySpark 数据帧根据索引从一个数组中定位值并复制到另一个数组

问题描述

在这个数据框中,我有以下两个数组:discount_applicaitons 和 line_items。line_items 数组有一个名为 discount_allocaitons 的内部数组,其中有一个名为 discount_application_index 的字段。要求是使用 discount_application_index 值并在 discount_applications 数组索引中找到相应的“type”值并将其复制到相应的 applications_type 字段中。

这是数据框:

records = '[{"_c":{"discount_applications":[{"type":"manual0"},{"type":"manual1"},{"type":"manual2"},{"type":"manual3"}],"line_items":[{"discount_allocations":[{"application_type":"","discount_application_index":0}]},{"discount_allocations":[{"application_type":"","discount_application_index":1}]},{"discount_allocations":[{"application_type":"","discount_application_index":2}]},{"discount_allocations":[{"application_type":"","discount_application_index":3}]}]}},{"_c":{"discount_applications":[{"type":"manual0"},{"type":"manual1"},{"type":"manual2"}],"line_items":[{"discount_allocations":[{"application_type":"","discount_application_index":0}]},{"discount_allocations":[{"application_type":"","discount_application_index":1}]},{"discount_allocations":[{"application_type":"","discount_application_index":2}]}]}},{"_c":{"discount_applications":[{"type":"manual0"},{"type":"manual1"},{"type":"manual2"}],"line_items":[{"discount_allocations":[{"application_type":"","discount_application_index":0}]},{"discount_allocations":[{"application_type":"","discount_application_index":1}]},{"discount_allocations":[{"application_type":"","discount_application_index":2}]}]}}]'
df = spark.read.json(sc.parallelize([records]))
df.show(truncate=False)
df.printSchema()

root
 |-- _c: struct (nullable = true)
 |    |-- discount_applications: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- type: string (nullable = true)
 |    |-- line_items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- discount_allocations: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- application_type: string (nullable = true)
 |    |    |    |    |    |-- discount_application_index: long (nullable = true)

+--------------------------------------------------------------------------------------------+
|_c                                                                                          |
+--------------------------------------------------------------------------------------------+
|[[[manual0], [manual1], [manual2], [manual3]], [[[[, 0]]], [[[, 1]]], [[[, 2]]], [[[, 3]]]]]|
|[[[manual0], [manual1], [manual2]], [[[[, 0]]], [[[, 1]]], [[[, 2]]]]]                      |
|[[[manual0], [manual1], [manual2]], [[[[, 0]]], [[[, 1]]], [[[, 2]]]]]                      |
+--------------------------------------------------------------------------------------------+

转换后,要求是让数据框看起来像这样:

+------------------------------------------------------------------------------------------------------------------------+
|_c                                                                                                                      |
+------------------------------------------------------------------------------------------------------------------------+
|[[[manual0], [manual1], [manual2], [manual3]], [[[[manual0, 0]]], [[[manual1, 1]]], [[[manual2, 2]]], [[[manual3, 3]]]]]|
|[[[manual0], [manual1], [manual2]], [[[[manual0, 0]]], [[[manual1, 1]]], [[[manual2, 2]]]]]                             |
|[[[manual0], [manual1], [manual2]], [[[[manual0, 0]]], [[[manual1, 1]]], [[[manual2, 2]]]]]                             |
+------------------------------------------------------------------------------------------------------------------------+

标签: sqldataframeapache-sparkpysparkapache-spark-sql

解决方案


只是让你的头脑清醒并做transform:)

import pyspark.sql.functions as F

df2 = df.withColumn(
    '_c', 
    F.expr("""
        struct(
            _c.discount_applications,
            transform(
                _c.line_items,
                x -> struct(
                    transform(
                        x.discount_allocations,
                        y -> struct(
                            _c.discount_applications[int(y.discount_application_index)].type as application_type,
                            y.discount_application_index as discount_application_index
                        )
                    ) as discount_allocations
                )
            ) as line_items
        )
    """)
)

df2.show(truncate=False)
+------------------------------------------------------------------------------------------------------------------------+
|_c                                                                                                                      |
+------------------------------------------------------------------------------------------------------------------------+
|[[[manual0], [manual1], [manual2], [manual3]], [[[[manual0, 0]]], [[[manual1, 1]]], [[[manual2, 2]]], [[[manual3, 3]]]]]|
|[[[manual0], [manual1], [manual2]], [[[[manual0, 0]]], [[[manual1, 1]]], [[[manual2, 2]]]]]                             |
|[[[manual0], [manual1], [manual2]], [[[[manual0, 0]]], [[[manual1, 1]]], [[[manual2, 2]]]]]                             |
+------------------------------------------------------------------------------------------------------------------------+

推荐阅读