首页 > 解决方案 > 连接单独处理的火花数据帧的两列时的顺序保证是什么?

问题描述

我有 3 列的数据框

  1. 日期
  2. jsonString1
  3. jsonString2

我想将 json 中的属性扩展为列。所以我做了这样的事情。

 val json1 = spark.read.json(dataframe.select(col("jsonString1")).rdd.map(_.getString(0)))
 val json2 = spark.read.json(dataframe.select(col("jsonString2")).rdd.map(_.getString(0)))

 val json1Table = json1.selectExpr("id", "status")
 val json2Table = json2.selectExpr("name", "address")

现在我想把这些表放在一起。所以我做了以下


     val json1TableWithIndex = addColumnIndex(json1Table)
     val json2TableWithIndex = addColumnIndex(json2Table)
     var finalResult = json1Table
            .join(json2Table, Seq("columnindex"))
            .drop("columnindex")

    def addColumnIndex(df: DataFrame) = spark.createDataFrame(
        df.rdd.zipWithIndex.map { case (row, columnindex) => Row.fromSeq(row.toSeq :+ columnindex) },
        StructType(df.schema.fields :+ StructField("columnindex", LongType, false))
    )

在对几行进行采样后,我观察到行与源数据帧中的完全匹配,当连接单独处理的数据帧的两列时,我没有找到任何关于订单保证的信息。这是解决我的问题的正确方法。任何帮助表示赞赏。

标签: scalaapache-sparkapache-spark-sqlamazon-emr

解决方案


依赖未记录的行为总是有风险的,因为您的代码可能无法按您的预期工作,因为您对它只有部分了解。

您可以在不使用任何拆分和连接方法的情况下以更有效的方式做同样的事情。使用from_json函数创建两个嵌套列,然后将嵌套列展平,最后丢弃中间 JSON 字符串列和嵌套列。

这是整个过程的示例。

import org.apache.spark.sql.types.{StringType, StructType, StructField}

val df = (Seq( 
("09-02-2020","{\"id\":\"01\", \"status\":\"Active\"}","{\"name\":\"Abdullah\", \"address\":\"Jumeirah\"}"), 
("10-02-2020","{\"id\":\"02\", \"status\":\"Dormant\"}","{\"name\":\"Ali\", \"address\":\"Jebel Ali\"}") 
).toDF("date","jsonString1","jsonString2"))

scala> df.show()
+----------+--------------------+--------------------+
|      date|         jsonString1|         jsonString2|
+----------+--------------------+--------------------+
|09-02-2020|{"id":"01", "stat...|{"name":"Abdullah...|
|10-02-2020|{"id":"02", "stat...|{"name":"Ali", "a...|
+----------+--------------------+--------------------+

val schema1 = (StructType(Seq(
  StructField("id", StringType, true), 
  StructField("status", StringType, true)
)))

val schema2 = (StructType(Seq(
  StructField("name", StringType, true), 
  StructField("address", StringType, true)
)))


val dfFlattened = (df.withColumn("jsonData1", from_json(col("jsonString1"), schema1))
            .withColumn("jsonData2", from_json(col("jsonString2"), schema2))
            .withColumn("id", col("jsonData1.id"))
            .withColumn("status", col("jsonData1.status"))
            .withColumn("name", col("jsonData2.name"))
            .withColumn("address", col("jsonData2.address"))
            .drop("jsonString1")
            .drop("jsonString2")
            .drop("jsonData1")
            .drop("jsonData2"))         

scala> dfFlattened.show()
+----------+---+-------+--------+---------+
|      date| id| status|    name|  address|
+----------+---+-------+--------+---------+
|09-02-2020| 01| Active|Abdullah| Jumeirah|
|10-02-2020| 02|Dormant|     Ali|Jebel Ali|
+----------+---+-------+--------+---------+   

推荐阅读