scala - 连接单独处理的火花数据帧的两列时的顺序保证是什么?
问题描述
我有 3 列的数据框
- 日期
- jsonString1
- jsonString2
我想将 json 中的属性扩展为列。所以我做了这样的事情。
val json1 = spark.read.json(dataframe.select(col("jsonString1")).rdd.map(_.getString(0)))
val json2 = spark.read.json(dataframe.select(col("jsonString2")).rdd.map(_.getString(0)))
val json1Table = json1.selectExpr("id", "status")
val json2Table = json2.selectExpr("name", "address")
现在我想把这些表放在一起。所以我做了以下
val json1TableWithIndex = addColumnIndex(json1Table)
val json2TableWithIndex = addColumnIndex(json2Table)
var finalResult = json1Table
.join(json2Table, Seq("columnindex"))
.drop("columnindex")
def addColumnIndex(df: DataFrame) = spark.createDataFrame(
df.rdd.zipWithIndex.map { case (row, columnindex) => Row.fromSeq(row.toSeq :+ columnindex) },
StructType(df.schema.fields :+ StructField("columnindex", LongType, false))
)
在对几行进行采样后,我观察到行与源数据帧中的完全匹配,当连接单独处理的数据帧的两列时,我没有找到任何关于订单保证的信息。这是解决我的问题的正确方法。任何帮助表示赞赏。
解决方案
依赖未记录的行为总是有风险的,因为您的代码可能无法按您的预期工作,因为您对它只有部分了解。
您可以在不使用任何拆分和连接方法的情况下以更有效的方式做同样的事情。使用from_json
函数创建两个嵌套列,然后将嵌套列展平,最后丢弃中间 JSON 字符串列和嵌套列。
这是整个过程的示例。
import org.apache.spark.sql.types.{StringType, StructType, StructField}
val df = (Seq(
("09-02-2020","{\"id\":\"01\", \"status\":\"Active\"}","{\"name\":\"Abdullah\", \"address\":\"Jumeirah\"}"),
("10-02-2020","{\"id\":\"02\", \"status\":\"Dormant\"}","{\"name\":\"Ali\", \"address\":\"Jebel Ali\"}")
).toDF("date","jsonString1","jsonString2"))
scala> df.show()
+----------+--------------------+--------------------+
| date| jsonString1| jsonString2|
+----------+--------------------+--------------------+
|09-02-2020|{"id":"01", "stat...|{"name":"Abdullah...|
|10-02-2020|{"id":"02", "stat...|{"name":"Ali", "a...|
+----------+--------------------+--------------------+
val schema1 = (StructType(Seq(
StructField("id", StringType, true),
StructField("status", StringType, true)
)))
val schema2 = (StructType(Seq(
StructField("name", StringType, true),
StructField("address", StringType, true)
)))
val dfFlattened = (df.withColumn("jsonData1", from_json(col("jsonString1"), schema1))
.withColumn("jsonData2", from_json(col("jsonString2"), schema2))
.withColumn("id", col("jsonData1.id"))
.withColumn("status", col("jsonData1.status"))
.withColumn("name", col("jsonData2.name"))
.withColumn("address", col("jsonData2.address"))
.drop("jsonString1")
.drop("jsonString2")
.drop("jsonData1")
.drop("jsonData2"))
scala> dfFlattened.show()
+----------+---+-------+--------+---------+
| date| id| status| name| address|
+----------+---+-------+--------+---------+
|09-02-2020| 01| Active|Abdullah| Jumeirah|
|10-02-2020| 02|Dormant| Ali|Jebel Ali|
+----------+---+-------+--------+---------+
推荐阅读
- c# - 使用 C++ dll 运行 c# 应用程序会生成无法加载异常
- android - Android 快速 Fragments 替换导致卡顿
- html - 垂直对齐弹性项目的最佳实践
- php - 来自日期时间集合的规则
- scala - 将 Any 转换为数组
- css - 图片右上角的按钮
- objective-c - NSUserDefaults 和 boolean - 这是怎么回事?目标 C
- visual-studio-code - 如何在 VSCode 扩展中使用原生 Electron WebView 标签?
- android - MPAndroidChart - 时间线条形图 Android
- vba - SAP如何在创建用于VBA的BOM时查找行数