首页 > 解决方案 > 将结构类型传递给 spark sql 数据帧中的方法或 UDFS

问题描述

我有两个数据框,我加入了它们,加入加入的数据框后,我有两列是结构类型的。基本上它们是 Array[[String,Int]]。我需要根据这种结构类型的元素派生第三列。

我的代码如下所示。

val bdf = Seq(
 ("a",1,1,10)
,("a",1,2,10)
,("a",1,3,10)
,("a",1,4,10)
,("b",1,1,20)
,("b",1,2,10)
,("a",2,3,10)
,("a",2,4,20)
,("a",2,5,20)
,("c",2,1,10)
,("c",2,2,20)
,("c",2,3,20)
).toDF("contract_number","linenumber","monthdel","open_quant")

val gbdf = bdf.withColumn("bmergedcol",struct(bdf("monthdel"),bdf("open_quant"))).groupBy("contract_number","linenumber").agg(collect_list("bmergedcol"))





val pl = Seq(
("a",1,"FLAT",10)
,("a",1,"FLAT",30)
,("a",1,"NFE",10)
,("b",1,"FLAT",10)
,("b",1,"NFE",10)
,("c",2,"NFE",10)
,("a",3,"NFE",20)
,("c",2,"FLAT",20)).toDF("connum","linnum","type","qnt")

import org.apache.spark.sql.functions._ 
val gpl = pl.withColumn("mergedcol",struct(pl("type"),pl("qnt"))).groupBy("connum","linnum").agg(collect_list("mergedcol"))


val jdf = gbdf.join(gpl,expr("((contract_number = connum) AND (linenumber = linnum ))"),"left_outer")

我的 jdf 输出就像

在此处输入图像描述

我需要了解如何将两个结构类型字段传递给某个方法并从中派生第三个?

标签: scaladataframeapache-sparkapache-spark-sql

解决方案


用户定义函数(又名 UDF)是 Spark SQL 的一项功能,用于定义新的基于列的函数来转换数据集。UDF 可用于传递两个结构类型字段以得出结果。

val customUdf = udf((col1: Seq[Row], col2: Int) => {
  // This is an example.
  col1(1).getAs[String]("type") + "--" + col2
})
val cdf = jdf.withColumn("custom", customUdf(jdf.col("collect_list(mergedcol)"), jdf.col("linnum")))
cdf.show(10)

在上面的 udf col1 是 Seq[Row] 因为它是一个结构类型的数组,如果只需要访问结构类型而不是简单的 Row 应该使用。


推荐阅读