首页 > 解决方案 > 如何使用 foreach 拆分 Spark 数据框中的 Json 格式列值

问题描述

我想将 JSON 格式列结果拆分为 Spark 数据框:

allrules_internalHive 中的表:

----------------------------------------------------------------
|tablename  |                 condition            | filter     |
|---------------------------------------------------------------|
| documents | {"col_list":"document_id,comments"}  | NA         |
| person    | {"per_list":"person_id, name, age"}  | NA         |
 ---------------------------------------------------------------

代码:

val allrulesDF = spark.read.table("default" + "." + "allrules_internal")
allrulesDF.show()

val df1 = allrulesDF.select(allrulesDF.col("tablename"), allrulesDF.col("condition"), allrulesDF.col("filter"), allrulesDF.col("dbname")).collect()

在这里,我想拆分condition列值。从上面的示例中,我想保留“document_id,comments”部分。换句话说,条件列有一个键/值对,但我只想要值部分。

如果表中超过一行allrules_internal如何拆分值。

  df1.foreach(row => { 
     //   condition = row.getAs("condition").toString() // here how to retrive ?
       println(condition)
       val tableConditionDF = spark.sql("SELECT "+ condition + " FROM " + db_name + "." + table_name)
       tableConditionDF.show()
 })

标签: scalaapache-sparkapache-spark-sqlapache-spark-2.0

解决方案


您可以使用以下from_json功能:

import org.apache.spark.sql.functions._
import spark.implicits._

allrulesDF
  .withColumn("condition", from_json($"condition", StructType(Seq(StructField("col_list", DataTypes.StringType, true)))))
  .select($"tablename", $"condition.col_list".as("condition"))

它将打印:

+---------+---------------------+
|tablename|condition            |
+---------+---------------------+
|documents|document_id, comments|
+---------+---------------------+

解释:

使用该withColumn方法,您可以使用组合一列或多列的函数来创建新列。在本例中,我们使用的from_json函数接收包含 JSON 字符串的列和一个StructType对象,该列中表示 JSON 字符串的模式。最后,您只需选择所需的列。

希望它有所帮助!


推荐阅读