首页 > 解决方案 > 如何在 Scala 中解析 JsonArray 并将它们写入 DataFrame?

问题描述

使用我的 Scala HTTP 客户端,我从 API调用中检索了JSON格式的响应。GET

我的最终目标是将此JSON内容写入存储桶,以便在运行简单爬虫AWS S3时将其作为表格使用。RedShiftAWS Glue

我的想法是解析此JSON消息并以某种方式转换为 Spark ,因此稍后我可以将其以、或其他DataFrame格式保存到我喜欢的 S3 位置。.csv.parquet

JSON 文件如下所示

{
  "response": {
    "status": "OK",
    "start_element": 0,
    "num_elements": 100,
    "categories": [
      {
        "id": 1,
        "name": "Airlines",
        "is_sensitive": false,
        "last_modified": "2010-03-19 17:48:36",
        "requires_whitelist_on_external": false,
        "requires_whitelist_on_managed": false,
        "is_brand_eligible": true,
        "requires_whitelist": false,
        "whitelist": {
          "geos": [],
          "countries_and_brands": []
        }
      },
      {
        "id": 2,
        "name": "Apparel",
        "is_sensitive": false,
        "last_modified": "2010-03-19 17:48:36",
        "requires_whitelist_on_external": false,
        "requires_whitelist_on_managed": false,
        "is_brand_eligible": true,
        "requires_whitelist": false,
        "whitelist": {
          "geos": [],
          "countries_and_brands": []
        }
      }
    ],
    "count": 148,
    "dbg_info": {
      "warnings": [],
      "version": "1.18.1621",
      "output_term": "categories"
    }
  }
}

我想映射到 Dataframe 的内容是"categories" JSON Array.

我设法以这种方式使用json4s.JsonMethods方法解析消息parse

val parsedJson = parse(request) \\ "categories"

获得以下内容:

output: org.json4s.JValue = JArray(List(JObject(List((id,JInt(1)), (name,JString(Airlines)), (is_sensitive,JBool(false)), (last_modified,JString(2010-03-19 17:48:36)), (requires_whitelist_on_external,JBool(false)), (requires_whitelist_on_managed,JBool(false)), (is_brand_eligible,JBool(true)), (requires_whitelist,JBool(false)), (whitelist,JObject(List((geos,JArray(List())), (countries_and_brands,JArray(List()))))))), JObject(List((id,JInt(2)), (name,JString(Apparel)), (is_sensitive,JBool(false)), (last_modified,JString(2010-03-19 17:48:36)), (requires_whitelist_on_external,JBool(false)), (requires_whitelist_on_managed,JBool(false)), (is_brand_eligible,JBool(true)), (requires_whitelist,JBool(false)), (whitelist,JObject(List((geos,JArray(List())), (countries_and_brands,JArray(List()))))))))

但是,我完全不知道如何进行。我什至尝试使用另一个名为 Scala 的库uJson

val json = (ujson.read(request))
val tuples = json("response")("categories").arr /* <-- categories is an array */ .map { item => 
(item("id"), item("name")) 

这次我只解析了两个字段进行测试,但这应该不会有太大变化。因此,我获得了以下结构:

tuples: scala.collection.mutable.ArrayBuffer[(ujson.Value, ujson.Value, ujson.Value, ujson.Value)] = ArrayBuffer((1,"Airlines",false,"2010-03-19 17:48:36"), (2,"Apparel",false,"2010-03-19 17:48:36"))

但是,这一次我也不知道如何继续前进,我尝试的一切都会导致错误,主要与格式不兼容有关。

请随时提出任何其他方法来实现我的目标,即使它完全改变了我的工作流程。我宁愿好好学点东西。谢谢

标签: jsonscalaapache-sparkapache-spark-sql

解决方案


我们可以使用以下代码将 JSON 转换为 Spark Dataframe/Dataset

val df00 = spark.read.option("multiline","true").json(Seq(JSON_OUTPUT).toDS())


推荐阅读