pyspark - 根据来自第二个数据帧的匹配键将列表附加到 Spark 数据帧列
问题描述
我有 2 个具有相同列名的 spark 数据框,并且希望在键列相互匹配时使用 df2 中同一列中的列表来扩展第一个 df 中的某些列。
df1:
+----+---+--++------+---------+-----+--------+--------+-------+
|k1 | k2 |list1 | list2 |list3|list4 |list5 |list 6 |
+----+---+--+-------+---------------------------------+-------+
| a| 121 |[car1] |[price1] |[1] |[False] |[0.000] |[vfdvf]|
| b| 11 |[car3] |[price3] |[2] |[False] |[1.000] |[00000]|
| c| 23 |[car3] |[price3] |[4] |[False] |[2.500] |[fdabh]|
| d| 250 |[car6] |[price6] |[6] |[True] |[0.450] |[00000]|
+----+---+--++----+---+--+--++----+---+------+----------------+
df2:
+----+---+--++------+---------+-----+--------+--------+-------+
|k1 | k2 |list1 | list2 |list3|list4 |list5 |list 6 |
+----+---+--+-------+---------------------------------+-------+
| m| 121 |[car5] |[price5] |[5] |[False] |[3.000] |[vfdvf]|
| b| 11 |[car8] |[price8] |[8] |[False] |[2.000] |[mnfaf]|
| c| 23 |[car7] |[price7] |[7] |[False] |[1.500] |[00000]|
| n| 250 |[car9] |[price9] |[9] |[False] |[0.450] |[00000]|
+----+---+--++----+---+--+--++----+---+------+----------------+
由于带有项目列表的列相互关联,因此订单必须保持不变。只有当key1和key2在两个dfs之间匹配时,有没有一种方法可以将整个列表从df2附加到df1?
结果应如下所示(我无法放入列表 6 列,但希望在结果中看到与其他列表列具有相同模式的结果):
+--+--+-----------+---------------+-----+------------+--------------+
|k1|k2|list1 | list2 |list3|list4 |list5 |
+--+--+-----------+---------------+-----+------------+--------------+
|b |11|[car3,car8]|[price3,price8]|[2,8]|[False,False]|[1.000,2.000]|
|c |23|[car3,car7]|[price3,price7]|[4,7]|[False,False]|[2.500,1.500]|
+--+--+-----------+---------------+-----+-------------+-------------+
我仍然是使用 UDF 的新手,并且在 stackoverflow 上找不到类似的问题,发现的唯一类似的 QI 是使用 pandas(如何在合并 DataFrames 时合并两个列表列?),这对我的用例来说非常慢。对此的任何见解将不胜感激。
解决方案
我找到了我的问题的答案,并想将其发布在此处与可能面临相同问题的其他人分享,以供我将来参考。
from pyspark.sql.types import BooleanType
from pyspark.sql.types import StringType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import ArrayType
from pyspark.sql.types import LongType
from pyspark.sql.types import ByteType
def concatTypesFunc(array1, array2):
final_array=array1+array2
return final_array
spark.udf.register("concat_types", concatTypesFunc,
ArrayType(BooleanType())
spark.udf.register("concat_types", concatTypesFunc,
ArrayType(StringType())
spark.udf.register("concat_types", concatTypesFunc,
ArrayType(DoubleType())
spark.udf.register("concat_types", concatTypesFunc,
ArrayType(IntegerType())
spark.udf.register("concat_types", concatTypesFunc,
ArrayType(LongType())
spark.udf.register("concat_types", concatTypesFunc,
ByteType(LongType())
df= spark.createDataFrame(
[
( "a", 121 ,["car1"] ,["price1"] ,[1] ,["False"] ,[0.000] ,["vfdvf"]),
( "b", 11 ,["car3"] ,["price3"] ,[2] ,["False"] ,[1.000] ,[00000]),
( "c", 23 ,["car3"] ,["price3"] ,[4] ,["False"] ,[2.500] ,["fdabh"]),
( "d", 250 ,["car6"] ,["price6"] ,[6] ,["True"] ,[0.450] ,[00000])
],table_schema
)
df2= spark.createDataFrame(
[
("m", 121 ,["car5"] ,["price5"] ,[5] ,["False"] ,[3.000] ,["vfdvf"]),
( "b", 11 ,["car8"] ,["price8"] ,[8] ,["False"] ,[2.000] ,["mnfaf"]),
( "c", 23 ,["car7"] ,["price7"] ,[7] ,["False"] ,[1.500] ,[00000]),
( "n", 250 ,["car9"] ,["price9"] ,[9] ,["False"] ,[0.450] ,[00000])
],table_schema
)
df.createOrReplaceTempView("a")
df2.createOrReplaceTempView("b")
spark.sql("select a.key1, a.key2, concat_types(a.list1,b.list1)List1 ,concat_types(a.list2,b.list2)List2, \
concat_types(a.list3,b.list3)List3 ,concat_types(a.list4,b.list4)List4,\
concat_types(a.list5,b.list5)List5 ,\
concat_types(a.list6,b.list6)List6 \
from a inner join b on a.key1=b.key1 order by a.key1").show(truncate=False)
+----+----+------------+----------------+------+--------------+----------+----------+
|key1|key2|List1 |List2 |List3 |List4 |List5 |List6 |
+----+----+------------+----------------+------+--------------+----------+----------+
|b |11 |[car3, car8]|[price3, price8]|[2, 8]|[False][False]|[1.0, 2.0]|[0, mnfaf]|
|c |23 |[car3, car7]|[price3, price7]|[4, 7]|[False][False]|[2.5, 1.5]|[fdabh, 0]|
+----+----+------------+----------------+------+--------------+----------+----------+
推荐阅读
- reactjs - 使用 Draft-js 时传递给孩子后缺少道具
- php - php如何发布带有2个变量的日期范围选择器
- spring-boot - 关于 spring.http.multipart.max-file-size 与 spring.servlet.multipart.max-file-size 的混淆
- android - RecyclerView wrap_content 不适用于 API 23 及更高版本
- javascript - aframe 0.8.2 加载 VR 作为魔法窗口/novr 耳机/进入 VR 而不分屏
- excel - 图表有时会导出到空白 .jpg 文件
- python - 熊猫填充到空字典
- state-machine - Enterprise Architect 可以从源代码逆向工程状态机图吗?
- javascript - 在javascript中使用动态键名推送数组值
- sql - SQL Server 考勤卡查询