json - Spark 2.0 (not 2.1) Dataset[Row] or Dataframe - 选择几列到 JSON
问题描述
我有一个包含 10 列的 Spark Dataframe,我需要将它存储在 Postgres/RDBMS 中。该表有 7 列,第 7 列采用文本(JSON 格式)以供进一步处理。
如何选择 6 列并将 DF 中的剩余 4 列转换为 JSON 格式?
如果要将整个 DF 存储为 JSON,那么我们可以使用 DF.write.format("json"),但只有最后 4 列需要为 JSON 格式。
我尝试创建一个 UDF(使用 Jackson 或 Lift lib),但未能成功将 4 列发送到 UDF。
对于 JSON,DF 列名称是键,DF 列的值是值。
例如:
dataset name: ds_base
root
|-- bill_id: string (nullable = true)
|-- trans_id: integer (nullable = true)
|-- billing_id: decimal(3,-10) (nullable = true)
|-- asset_id: string (nullable = true)
|-- row_id: string (nullable = true)
|-- created: string (nullable = true)
|-- end_dt: string (nullable = true)
|-- start_dt: string (nullable = true)
|-- status_cd: string (nullable = true)
|-- update_start_dt: string (nullable = true)
I want to do,
ds_base
.select ( $"bill_id",
$"trans_id",
$"billing_id",
$"asset_id",
$"row_id",
$"created",
?? <JSON format of 4 remaining columns>
)
解决方案
您可以使用struct
和to_json
:
import org.apache.spark.sql.functions.{to_json, struct}
to_json(struct($"end_dt", $"start_dt", $"status_cd", $"update_start_dt"))
作为旧 Spark 版本的解决方法,您可以将整个对象转换为 JSON 并提取所需的内容:
import org.apache.spark.sql.functions.get_json_object
// List of column names to be kept as-is
val scalarColumns: Seq[String] = Seq("bill_id", "trans_id", ...)
// List of column names to be put in JSON
val jsonColumns: Seq[String] = Seq(
"end_dt", "start_dt", "status_cd", "update_start_dt"
)
// Convert all records to JSON, keeping selected fields as a nested document
val json = df.select(
scalarColumns.map(col _) :+
struct(jsonColumns map col: _*).alias("json"): _*
).toJSON
json.select(
// Extract selected columns from JSON field and cast to required types
scalarColumns.map(c =>
get_json_object($"value", s"$$.$c").alias(c).cast(df.schema(c).dataType)) :+
// Extract JSON struct
get_json_object($"value", "$.json").alias("json"): _*
)
这仅在您具有原子类型时才有效。或者,您可以使用标准 JSON 阅读器并为 JSON 字段指定模式。
import org.apache.spark.sql.types._
val combined = df.select(
scalarColumns.map(col _) :+
struct(jsonColumns map col: _*).alias("json"): _*
)
val newSchema = StructType(combined.schema.fields map {
case StructField("json", _, _, _) => StructField("json", StringType)
case s => s
})
spark.read.schema(newSchema).json(combined.toJSON.rdd)
推荐阅读
- opencv - 如何获取轮廓堆图的中点
- swift - 如何为图像添加恒定角速度?
- docker - 运行从 Dockerfile 构建的 docker 映像时无法连接其他服务
- amazon-web-services - 无法使用 aws_s3.query_export_to_s3 函数导出带有“where”子句的 AWS RDS Postgres 表
- vue.js - 如何在 Vue.js 中获取正确的详细调试消息?
- tiptap - 我的tiptap bubble_menu 可以工作,但在单击菜单项时它也会提交表单
- javascript - 单击“再次”后,JavaScript WebApp 失去形状
- jquery - 将脚本动态添加到父页面时无法打开对话框
- r - 列未与 ggplot geom_col 中的数据对齐
- c++ - 如何通过构造函数通过成员初始化列表初始化C风格的char数组和int数组?