首页 > 解决方案 > 根据来自第二个数据帧的匹配键将列表附加到 Spark 数据帧列

问题描述

我有 2 个具有相同列名的 spark 数据框,并且希望在键列相互匹配时使用 df2 中同一列中的列表来扩展第一个 df 中的某些列。

df1:
+----+---+--++------+---------+-----+--------+--------+-------+
|k1  |  k2  |list1  | list2   |list3|list4   |list5   |list 6 |
+----+---+--+-------+---------------------------------+-------+
|   a| 121  |[car1] |[price1] |[1]  |[False] |[0.000] |[vfdvf]|
|   b| 11   |[car3] |[price3] |[2]  |[False] |[1.000] |[00000]|
|   c| 23   |[car3] |[price3] |[4]  |[False] |[2.500] |[fdabh]|
|   d| 250  |[car6] |[price6] |[6]  |[True]  |[0.450] |[00000]|
+----+---+--++----+---+--+--++----+---+------+----------------+


df2:
+----+---+--++------+---------+-----+--------+--------+-------+
|k1  |  k2  |list1  | list2   |list3|list4   |list5   |list 6 |
+----+---+--+-------+---------------------------------+-------+
|   m| 121  |[car5] |[price5] |[5]  |[False] |[3.000] |[vfdvf]|
|   b| 11   |[car8] |[price8] |[8]  |[False] |[2.000] |[mnfaf]|
|   c| 23   |[car7] |[price7] |[7]  |[False] |[1.500] |[00000]|
|   n| 250  |[car9] |[price9] |[9]  |[False] |[0.450] |[00000]|
+----+---+--++----+---+--+--++----+---+------+----------------+

由于带有项目列表的列相互关联,因此订单必须保持不变。只有当key1和key2在两个dfs之间匹配时,有没有一种方法可以将整个列表从df2附加到df1?

结果应如下所示(我无法放入列表 6 列,但希望在结果中看到与其他列表列具有相同模式的结果):

   +--+--+-----------+---------------+-----+------------+--------------+
   |k1|k2|list1      | list2         |list3|list4       |list5        |
   +--+--+-----------+---------------+-----+------------+--------------+
   |b |11|[car3,car8]|[price3,price8]|[2,8]|[False,False]|[1.000,2.000]| 
   |c |23|[car3,car7]|[price3,price7]|[4,7]|[False,False]|[2.500,1.500]| 
   +--+--+-----------+---------------+-----+-------------+-------------+

我仍然是使用 UDF 的新手,并且在 stackoverflow 上找不到类似的问题,发现的唯一类似的 QI 是使用 pandas(如何在合并 DataFrames 时合并两个列表列?),这对我的用例来说非常慢。对此的任何见解将不胜感激。

标签: pysparkapache-spark-sqluser-defined-functionspyspark-dataframes

解决方案


我找到了我的问题的答案,并想将其发布在此处与可能面临相同问题的其他人分享,以供我将来参考。

    from pyspark.sql.types import BooleanType
    from pyspark.sql.types import StringType
    from pyspark.sql.types import DoubleType
    from pyspark.sql.types import IntegerType
    from pyspark.sql.types import ArrayType
    from pyspark.sql.types import LongType
    from pyspark.sql.types import ByteType
    
    def concatTypesFunc(array1, array2): 
        final_array=array1+array2
        return final_array

    spark.udf.register("concat_types", concatTypesFunc, 
    ArrayType(BooleanType())
    spark.udf.register("concat_types", concatTypesFunc, 
    ArrayType(StringType())
    spark.udf.register("concat_types", concatTypesFunc, 
    ArrayType(DoubleType())
    spark.udf.register("concat_types", concatTypesFunc, 
    ArrayType(IntegerType())
    spark.udf.register("concat_types", concatTypesFunc, 
    ArrayType(LongType())
    spark.udf.register("concat_types", concatTypesFunc, 
    ByteType(LongType()) 

    df= spark.createDataFrame(
        [
     (  "a", 121  ,["car1"] ,["price1"] ,[1]  ,["False"] ,[0.000] ,["vfdvf"]),
    (   "b", 11   ,["car3"] ,["price3"] ,[2]  ,["False"] ,[1.000] ,[00000]),
    (   "c", 23   ,["car3"] ,["price3"] ,[4]  ,["False"] ,[2.500] ,["fdabh"]),
    (   "d", 250  ,["car6"] ,["price6"] ,[6]  ,["True"]  ,[0.450] ,[00000])
           
            ],table_schema
        )
    
    df2= spark.createDataFrame(
        [
     ("m", 121  ,["car5"] ,["price5"] ,[5]  ,["False"] ,[3.000] ,["vfdvf"]),
    (   "b", 11   ,["car8"] ,["price8"] ,[8]  ,["False"] ,[2.000] ,["mnfaf"]),
    (   "c", 23   ,["car7"] ,["price7"] ,[7]  ,["False"] ,[1.500] ,[00000]),
    (  "n", 250  ,["car9"] ,["price9"] ,[9]  ,["False"] ,[0.450] ,[00000])
    
    ],table_schema
        )
    df.createOrReplaceTempView("a")
    df2.createOrReplaceTempView("b")
    spark.sql("select a.key1, a.key2, concat_types(a.list1,b.list1)List1 ,concat_types(a.list2,b.list2)List2, \
    concat_types(a.list3,b.list3)List3 ,concat_types(a.list4,b.list4)List4,\
              concat_types(a.list5,b.list5)List5 ,\
              concat_types(a.list6,b.list6)List6 \
    from a inner join b on a.key1=b.key1 order by a.key1").show(truncate=False)

 +----+----+------------+----------------+------+--------------+----------+----------+
|key1|key2|List1       |List2           |List3 |List4         |List5     |List6     |
+----+----+------------+----------------+------+--------------+----------+----------+
|b   |11  |[car3, car8]|[price3, price8]|[2, 8]|[False][False]|[1.0, 2.0]|[0, mnfaf]|
|c   |23  |[car3, car7]|[price3, price7]|[4, 7]|[False][False]|[2.5, 1.5]|[fdabh, 0]|
+----+----+------------+----------------+------+--------------+----------+----------+


推荐阅读