首页 > 解决方案 > 在 Spark 中解析类似 Json 的结构

问题描述

我有一个数据结构像这样的文件

{'analytics_category_id': 'Default', 'item_sub_type': '', 'deleted_at': '', 'product_category_id': 'Default', 'unit_price': '0.000', 'id': 'myntramprdii201907174a72fb2475d84103844083d1348acb9e', 'is_valid': True, 'measurement_uom_id': '', 'description': '', 'invoice_type': 'receivable_debit_note', 'linked_core_invoice_item_id': '', 'ref_3': '741423', 'ref_2': '6001220139357318', 'ref_1': '2022-07-04', 'tax_rate': '0.000', 'reference_id': '', 'ref_4': '', 'product_id': 'Default', 'total_amount': '0.000', 'tax_auth_party_id': '', 'item_type': 'Product', 'invoice_item_attributes': '', 'core_invoice_id': 'myntramprdi20190717a1e925911345463393bc4ac1b124dbe5', 'tax_auth_geo_id': '', 'quantity': 1}


{'analytics_category_id': 'Default', 'item_sub_type': '', 'deleted_at': '', 'product_category_id': 'Default', 'unit_price': '511.000', 'id': 'myntramprdii20190717c749a96d2e7144aea7fc5125287717f7', 'is_valid': True, 'measurement_uom_id': '', 'description': '', 'invoice_type': 'receivable_debit_note', 'linked_core_invoice_item_id': '', 'ref_3': '741424', 'ref_2': '6001220152640260', 'ref_1': '2022-07-07', 'tax_rate': '0.000', 'reference_id': '', 'ref_4': '', 'product_id': 'Default', 'total_amount': '511.000', 'tax_auth_party_id': '', 'item_type': 'Product', 'invoice_item_attributes': '', 'core_invoice_id': 'myntramprdi20190717a1e925911345463393bc4ac1b124dbe5', 'tax_auth_geo_id': '', 'quantity': 1}

我正在尝试使用 scala 在 Spark 中解析它并从中创建一个数据框,但由于结构原因无法这样做。我想过用替换'"但我的文本也可以有相同的。我需要的是数据的键值对。

到目前为止,我已经尝试过:

read.option("multiline", "true").json("s3://******/*********/prod_flattener/y=2019/m=07/d=17/type=flattened_core_invoices_items/invoice_items_2019_07_17_23_53_19.txt") 

作为多行文本,我确实获得了一些成功:

read.option("multiline", "true").textFile("s3://********/*********/prod_flattener/y=2019/m=07/d=17/type=flattened_core_invoices_items/invoice_items_2019_07_17_23_53_19.txt")

|               value|
+--------------------+
|{'analytics_categ...|
|{'analytics_categ...|
+--------------------+

我现在如何将键读取为列?

标签: jsonscalaapache-sparkapache-spark-sql

解决方案


您的问题与条目中用作布尔值的值相关联:这在需要作为布尔值True的 JSON 中无效truefalse

如果您的数据集不是很大,最简单的方法是作为文本加载,修复此问题,写入固定数据,然后以 json 格式重新打开它。

import spark.implicits._
import org.apache.spark.sql.types._

val initial = spark.read.text("s3://******/*********/prod_flattener/y=2019/m=07/d=17/type=flattened_core_invoices_items/invoice_items_2019_07_17_23_53_19.txt") 

val fixed = initial
    .select(regexp_replace('value,"\\bTrue\\b","true") as "value")
    .select(regexp_replace('value,"\\bFalse\\b","false") as "value")

fixed.write.mode("overwrite").text("/tmp/fixed_items")

val json_df = spark.read.json("/tmp/fixed_items")
json_df: org.apache.spark.sql.DataFrame = [analytics_category_id: string, core_invoice_id: string ... 23 more fields]

如果您不想使用临时数据集,可以直接使用from_json来解析固定文本值,但您需要事先在 spark 中手动定义您的模式,并在解析后进行一些列重命名:

val jsonSchema = StructType.fromDDL("`analytics_category_id` STRING,`core_invoice_id` STRING,`deleted_at` STRING,`description` STRING,`id` STRING,`invoice_item_attributes` STRING,`invoice_type` STRING,`is_valid` BOOLEAN,`item_sub_type` STRING,`item_type` STRING,`linked_core_invoice_item_id` STRING,`measurement_uom_id` STRING,`product_category_id` STRING,`product_id` STRING,`quantity` BIGINT,`ref_1` STRING,`ref_2` STRING,`ref_3` STRING,`ref_4` STRING,`reference_id` STRING,`tax_auth_geo_id` STRING,`tax_auth_party_id` STRING,`tax_rate` STRING,`total_amount` STRING,`unit_price` STRING")

val jsonParsingOptions: Map[String,String] = Map()

val json_df = fixed
     .select(from_json('value, jsonSchema, jsonParsingOptions) as "j")
     .select(jsonSchema.map(f => 'j.getItem(f.name).as(f.name)):_*)
json_df: org.apache.spark.sql.DataFrame = [analytics_category_id: string, core_invoice_id: string ... 23 more fields]

顺便说一句,从您发布的片段中,您似乎不需要该multiline选项,但如果您确实需要,则需要将该选项添加到jsonParsingOptions地图中。


推荐阅读