首页 > 解决方案 > SCALA - 如何在结构中调用函数?

问题描述

我构建了一个数据框,它将有一个实际上是结构的 String 列,转换为 JSON 字符串。

val df = df_helper.select(
                     lit("some data").as("id"),
                     to_json(
                          struct(
                              col("id"),
                              col("type"),
                              col("path")
                          )
                     )).as("content")

我还构建了一个函数,它将标识符id:String作为参数并输出一个字符串列表。

def buildHierarchy(id:String) : List[String] = {

  val check_df = hierarchy_df.select(
    $"parent_id",
    $"id"
  ).where($"id" === id)

  val pathArray = List(id)
  val parentString = check_df.select($"parent_id").first.getString(0)

  if (parentString == null) {
    return pathArray
  }
  else {
    val pathList = buildHierarchy(parentString)
    val finalList: List[String] = pathList ++ pathArray
    return finalList
  }
}

我想调用这个函数并将路径列替换为函数的结果。这是可能的,还是有解决方法?

先感谢您!

标签: scalafunctionapache-sparkstructdatabricks

解决方案


在以下博客文章的帮助下,我建立了层次结构:

https://www.qubole.com/blog/processing-hierarchical-data-using-spark-graphx-pregel-api/

我在最终的数据框中包含了所有必要的信息,我需要为结构的每个元素构建详细信息,以及每个元素的层次结构路径。然后作为一个新列,我为每个元素创建了一个包含详细信息的结构。

val content_df = hierarchy_df.select($"path")
                                    .withColumn("content", 
                                           struct( col("id"),
                                                   col("type"),
                                                   col("path")
                                            ))

我分解了路径以到达其中的每个标识符,但保留位置顺序,以加入路径中每个级别的内容。

val exploded_df = content_df.select(
                         $"*",posexplode($"path"), 
                         $"pos".as("exploded_level"),
                         $"col".as("exploded_id"))

最后,将内容与路径连接起来,并将内容聚合成一个路径,该路径包含每个级别的所有内容。

val level_content_df = exploded_df.as("e")
           .join(content_df.as("ouc"), col("e.col") === col("ouc.id"), "left")
                      .select($"e.*", $"ouc.content".as("content"))

val pathFull_df = level_content_df
            .groupBy(col("id").as("id"))
            .agg(sort_array(collect_list("content")).as("pathFull"))

然后最后我又把整个事情放在一起:)

val content_with_path_df = content_df.as("ou")
              .join(pathFull_df.as("opf"), col("ou.id") === col("opf.id"), "left")
              .select($"ou.*", $"opf.pathFull".as("pathFull"))

如果它没有意义,请随时与我联系,我花了一段时间才实现它!:D


推荐阅读