首页 > 解决方案 > 为 Spark 编写 JSON 模式的教程

问题描述

我在表列中有一个 JSON 字段作为字符串,我需要使用 Spark 解析和分解。

编辑:让我的问题更容易理解:我的数据在 JSON 数组中。如何为此类数据编写架构?以下是此类 JSON 的示例:

[{"CategoryName":"cat5","CategoryTitle":"cat_title","CategoryLevels":"3"}]

这就是 JSON 的样子,包裹在 [] 中:

[{"ProductName":"MS Quattro plan US QA","CartProductPromotion":null,"ProductConfigPromotions":null,"ProductKey":"PDUKVWL8XSBJUX","IsConfigurable":false,"IsConfigured":false,"ProductProvisionType": 0,"VendorKey":null,"ProductConfigSettingTemplateKey":null,"ProductConfigKey":"PLK1O3JDCGVJFM","ParentConfigKey":null,"VendorConfigKey":null,"Quantity":1,"PromoCodes":null,"Price": 140.0,"CustomerKey":null,"CustomerDomainPrefix":null,"CustomerContactId":"6550d015-5ac1-464a-95b8-42ae4cfea05e","ParentOrderLineId":null,"ParentVendorSubscriptionId":null,"BillingFrequency":1,"BillingType ":1,"DueToday":140.0,"originalPrice":null,"RemainingVendorSettingsName":null,"Trial":null,"Services":[{"ServiceKey":"SV4RSKT6C0TAVU","VendorServiceKey":"a044b16a-1861-4308-8086- a3a3b506fac2","VendorSubscriptionKey":null,"Name":"Office 365 Enterprise E5","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0,"BillingFrequency":1,"CreatedOnUtc":null, "UpdatedOnUtc":null,"Quantity":1.0,"Price":120.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0,"DefaultQuantity":null,"Cost":60.0,"ProvisionDate ":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":null,"QuestionNumber":null,"IsQuestionRequired":null,"OptionText":null,"OptionNumber":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion":null}, "IsDummy":null,"VendorKey":null,"EULADateTime":null},{"ServiceKey":"SVZEOGY5DEUV29","VendorServiceKey":"Quattro 支持服务","VendorSubscriptionKey":null,"Name":"Quattro支持服务","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0,"BillingFrequency":1,"CreatedOnUtc":null,"UpdatedOnUtc":null,"Quantity":1.0,"Price": 20.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0,"DefaultQuantity":null,"Cost":10.0,"ProvisionDate":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":null,"QuestionNumber":null,"IsQuestionRequired" :null,"OptionText":null,"OptionNumber":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion":null},"IsDummy":null,"VendorKey":null,"EULADateTime": null}],"ProfileId":521,"FloorPlanId":null,"Currency":"USD","ProductSettings":null,"CustomerSettings":null,"ResellerSettings":null},{"ProductName":"Office 365 企业 E1","CartProductPromotion":null,"ProductConfigPromotions":null,"ProductKey":"PDUKVWL8XSBJUX","IsConfigurable":false,"IsConfigured":false,"ProductProvisionType":0,"VendorKey":null,"ProductConfigSettingTemplateKey":null,"ProductConfigKey":"PL40SXO0YBS8LW","ParentConfigKey": null,"VendorConfigKey":null,"Quantity":1,"PromoCodes":null,"Price":84.0,"CustomerKey":null,"CustomerDomainPrefix":null,"CustomerContactId":"6550d015-5ac1-464a-95b8 -42ae4cfea05e","ParentOrderLineId":null,"ParentVendorSubscriptionId":null,"BillingFrequency":1,"BillingType":1,"DueToday":84.0,"originalPrice":null,"RemainingVendorSettingsName":null,"Trial":空,“服务”:[{“ServiceKey":"SV097S8NKP3PLO","VendorServiceKey":"Quattro Support Service","VendorSubscriptionKey":null,"Name":"Quattro Support Service","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0 ,"BillingFrequency":1,"CreatedOnUtc":null,"UpdatedOnUtc":null,"Quantity":1.0,"Price":44.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0," DefaultQuantity":null,"Cost":22.0,"ProvisionDate":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":null,"QuestionNumber":null,"IsQuestionRequired":null,"OptionText": null,"选项编号":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion":null},"IsDummy":null,"VendorKey":null,"EULADateTime":null},{"ServiceKey":"SVJZL3RZL4LLZ7","VendorServiceKey": "91fd106f-4b2c-4938-95ac-f54f74e9a239","VendorSubscriptionKey":null,"Name":"Office 365 Enterprise E1","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0,"BillingFrequency" :1,"CreatedOnUtc":null,"UpdatedOnUtc":null,"Quantity":1.0,"Price":40.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0,"DefaultQuantity":null ,"成本":20.0,"ProvisionDate":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":null,"QuestionNumber":null,"IsQuestionRequired":null,"OptionText":null,"OptionNumber":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion" :null},"IsDummy":null,"VendorKey":null,"EULADateTime":null}],"ProfileId":521,"FloorPlanId":null,"Currency":"USD","ProductSettings":null, "CustomerSettings":null,"ResellerSettings":null}]"IsDummy":null,"VendorKey":null,"EULADateTime":null}],"ProfileId":521,"FloorPlanId":null,"Currency":"USD","ProductSettings":null,"CustomerSettings": null,"ResellerSettings":null}]"IsDummy":null,"VendorKey":null,"EULADateTime":null}],"ProfileId":521,"FloorPlanId":null,"Currency":"USD","ProductSettings":null,"CustomerSettings": null,"ResellerSettings":null}]

我手动编写了这个架构:

StructType([
            StructField("ProductName", StringType()),
            StructField("CartProductPromotion", StringType()),
            StructField("ProductConfigPromotions", StringType()),
            StructField("ProductKey", StringType()),
            StructField("IsConfigurable", StringType()),
            StructField("IsConfigured", StringType()),
            StructField("ProductProvisionType", StringType()),
            StructField("VendorKey", StringType()),
            StructField("ProductConfigSettingTemplateKey", StringType()),
            StructField("ProductConfigKey", StringType()),
            StructField("ParentConfigKey", StringType()),
            StructField("VendorConfigKey", StringType()),
            StructField("Quantity", StringType()),
            StructField("PromoCodes", StringType()),
            StructField("Price", StringType()),
            StructField("CustomerKey", StringType()),
            StructField("CustomerDomainPrefix", StringType()),
            StructField("CustomerContactId", StringType()),
            StructField("ParentOrderLineId", StringType()),
            StructField("ParentVendorSubscriptionId", StringType()),
            StructField("BillingFrequency", StringType()),
            StructField("BillingType", StringType()),
            StructField("DueToday", StringType()),
            StructField("originalPrice", StringType()),
            StructField("RemainingVendorSettingsName", StringType()),
            StructField("Trial", StringType()),
            StructField("Services", ArrayType(StructType([
                StructField("ServiceKey", StringType()),
                StructField("VendorServiceKey", StringType()),
                StructField("VendorSubscriptionKey", StringType()),
                StructField("Name", StringType()),
                StructField("VendorProvisionResponse", StringType()),
                StructField("ProvisionStatus", StringType()),
                StructField("SubscriptionStatus", StringType()),
                StructField("BillingFrequency", StringType()),              
                StructField("CreatedOnUtc", StringType()),
                StructField("UpdatedOnUtc", StringType()),
                StructField("Quantity", StringType()),
                StructField("Price", StringType()),

                StructField("RateCardId", StringType()),
                StructField("RateCardVersion", StringType()),
                StructField("Margin", StringType()),
                StructField("DefaultQuantity", StringType()),               
                StructField("Cost", StringType()),
                StructField("ProvisionDate", StringType()),
                StructField("ParentServiceKey", StringType()),
                StructField("ServiceConfiguration", StructType([
                    StructField("QuestionText", StringType()),
                    StructField("QuestionNumber", StringType()),
                    StructField("IsQuestionRequired", StringType()),
                    StructField("OptionText", StringType()),
                    StructField("OptionNumber", StringType()),
                    StructField("MaxAllowedServices", StringType()),
                    StructField("NextAction", StringType()),
                    StructField("NextActionQuestion", StringType())
                ])),
                StructField("IsDummy", StringType()),
                StructField("VendorKey", StringType()),
                StructField("EULADateTime", StringType())               
            ]))),
            StructField("ProfileId", StringType()),
            StructField("FloorPlanId", StringType()),
            StructField("Currency", StringType()),
            StructField("ProductSettings", StringType()),               
            StructField("CustomerSettings", StringType()),
            StructField("ResellerSettings", StringType())   ])

它不会将我的字段解析为数组,我在新字段中得到 Null(底部的代码)。

+---+--------------------+--------------+
|key|               value|value_w_schema|
+---+--------------------+--------------+
| 10|[{"ProductName":"...|          null|
+---+--------------------+--------------+

但是,如果我从字符串字段中删除 [] 并保留 1 个 json,包裹在 {} 中,那么架构可以工作。我应该将写入的模式包装到另一个结构或数组中吗?有人可以指出我编写这些模式的好教程吗?

可重现的代码:

cart_CartProducts_schema = StructType([
            StructField("ProductName", StringType()),
            StructField("CartProductPromotion", StringType()),
            StructField("ProductConfigPromotions", StringType()),
            StructField("ProductKey", StringType()),
            StructField("IsConfigurable", StringType()),
            StructField("IsConfigured", StringType()),
            StructField("ProductProvisionType", StringType()),
            StructField("VendorKey", StringType()),
            StructField("ProductConfigSettingTemplateKey", StringType()),
            StructField("ProductConfigKey", StringType()),
            StructField("ParentConfigKey", StringType()),
            StructField("VendorConfigKey", StringType()),
            StructField("Quantity", StringType()),
            StructField("PromoCodes", StringType()),
            StructField("Price", StringType()),
            StructField("CustomerKey", StringType()),
            StructField("CustomerDomainPrefix", StringType()),
            StructField("CustomerContactId", StringType()),
            StructField("ParentOrderLineId", StringType()),
            StructField("ParentVendorSubscriptionId", StringType()),
            StructField("BillingFrequency", StringType()),
            StructField("BillingType", StringType()),
            StructField("DueToday", StringType()),
            StructField("originalPrice", StringType()),
            StructField("RemainingVendorSettingsName", StringType()),
            StructField("Trial", StringType()),
            StructField("Services", ArrayType(StructType([
                StructField("ServiceKey", StringType()),
                StructField("VendorServiceKey", StringType()),
                StructField("VendorSubscriptionKey", StringType()),
                StructField("Name", StringType()),
                StructField("VendorProvisionResponse", StringType()),
                StructField("ProvisionStatus", StringType()),
                StructField("SubscriptionStatus", StringType()),
                StructField("BillingFrequency", StringType()),              
                StructField("CreatedOnUtc", StringType()),
                StructField("UpdatedOnUtc", StringType()),
                StructField("Quantity", StringType()),
                StructField("Price", StringType()),

                StructField("RateCardId", StringType()),
                StructField("RateCardVersion", StringType()),
                StructField("Margin", StringType()),
                StructField("DefaultQuantity", StringType()),               
                StructField("Cost", StringType()),
                StructField("ProvisionDate", StringType()),
                StructField("ParentServiceKey", StringType()),
                StructField("ServiceConfiguration", StructType([
                    StructField("QuestionText", StringType()),
                    StructField("QuestionNumber", StringType()),
                    StructField("IsQuestionRequired", StringType()),
                    StructField("OptionText", StringType()),
                    StructField("OptionNumber", StringType()),
                    StructField("MaxAllowedServices", StringType()),
                    StructField("NextAction", StringType()),
                    StructField("NextActionQuestion", StringType())
                ])),
                StructField("IsDummy", StringType()),
                StructField("VendorKey", StringType()),
                StructField("EULADateTime", StringType())               
            ]))),
            StructField("ProfileId", StringType()),
            StructField("FloorPlanId", StringType()),
            StructField("Currency", StringType()),
            StructField("ProductSettings", StringType()),               
            StructField("CustomerSettings", StringType()),
            StructField("ResellerSettings", StringType())
   ])

data = [(10,'''[{"ProductName":"MS Quattro plan US QA","CartProductPromotion":null,"ProductConfigPromotions":null,"ProductKey":"PDUKVWL8XSBJUX","IsConfigurable":false,"IsConfigured":false,"ProductProvisionType":0,"VendorKey":null,"ProductConfigSettingTemplateKey":null,"ProductConfigKey":"PLK1O3JDCGVJFM","ParentConfigKey":null,"VendorConfigKey":null,"Quantity":1,"PromoCodes":null,"Price":140.0,"CustomerKey":null,"CustomerDomainPrefix":null,"CustomerContactId":"6550d015-5ac1-464a-95b8-42ae4cfea05e","ParentOrderLineId":null,"ParentVendorSubscriptionId":null,"BillingFrequency":1,"BillingType":1,"DueToday":140.0,"originalPrice":null,"RemainingVendorSettingsName":null,"Trial":null,"Services":[{"ServiceKey":"SV4RSKT6C0TAVU","VendorServiceKey":"a044b16a-1861-4308-8086-a3a3b506fac2","VendorSubscriptionKey":null,"Name":"Office 365 Enterprise E5","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0,"BillingFrequency":1,"CreatedOnUtc":null,"UpdatedOnUtc":null,"Quantity":1.0,"Price":120.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0,"DefaultQuantity":null,"Cost":60.0,"ProvisionDate":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":"Hello1","QuestionNumber":null,"IsQuestionRequired":null,"OptionText":null,"OptionNumber":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion":null},"IsDummy":null,"VendorKey":null,"EULADateTime":null},{"ServiceKey":"SVZEOGY5DEUV29","VendorServiceKey":"Quattro Support Service","VendorSubscriptionKey":null,"Name":"Quattro Support Service","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0,"BillingFrequency":1,"CreatedOnUtc":null,"UpdatedOnUtc":null,"Quantity":1.0,"Price":20.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0,"DefaultQuantity":null,"Cost":10.0,"ProvisionDate":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":"Hello2","QuestionNumber":null,"IsQuestionRequired":null,"OptionText":null,"OptionNumber":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion":null},"IsDummy":null,"VendorKey":null,"EULADateTime":null}],"ProfileId":521,"FloorPlanId":null,"Currency":"USD","ProductSettings":null,"CustomerSettings":null,"ResellerSettings":null},{"ProductName":"Office 365 Enterprise E1","CartProductPromotion":null,"ProductConfigPromotions":null,"ProductKey":"PDUKVWL8XSBJUX","IsConfigurable":false,"IsConfigured":false,"ProductProvisionType":0,"VendorKey":null,"ProductConfigSettingTemplateKey":null,"ProductConfigKey":"PL40SXO0YBS8LW","ParentConfigKey":null,"VendorConfigKey":null,"Quantity":1,"PromoCodes":null,"Price":84.0,"CustomerKey":null,"CustomerDomainPrefix":null,"CustomerContactId":"6550d015-5ac1-464a-95b8-42ae4cfea05e","ParentOrderLineId":null,"ParentVendorSubscriptionId":null,"BillingFrequency":1,"BillingType":1,"DueToday":84.0,"originalPrice":null,"RemainingVendorSettingsName":null,"Trial":null,"Services":[{"ServiceKey":"SV097S8NKP3PLO","VendorServiceKey":"Quattro Support Service","VendorSubscriptionKey":null,"Name":"Quattro Support Service","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0,"BillingFrequency":1,"CreatedOnUtc":null,"UpdatedOnUtc":null,"Quantity":1.0,"Price":44.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0,"DefaultQuantity":null,"Cost":22.0,"ProvisionDate":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":"Hello3","QuestionNumber":null,"IsQuestionRequired":null,"OptionText":null,"OptionNumber":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion":null},"IsDummy":null,"VendorKey":null,"EULADateTime":null},{"ServiceKey":"SVJZL3RZL4LLZ7","VendorServiceKey":"91fd106f-4b2c-4938-95ac-f54f74e9a239","VendorSubscriptionKey":null,"Name":"Office 365 Enterprise E1","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0,"BillingFrequency":1,"CreatedOnUtc":null,"UpdatedOnUtc":null,"Quantity":1.0,"Price":40.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0,"DefaultQuantity":null,"Cost":20.0,"ProvisionDate":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":null,"QuestionNumber":null,"IsQuestionRequired":null,"OptionText":null,"OptionNumber":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion":null},"IsDummy":null,"VendorKey":null,"EULADateTime":null}],"ProfileId":521,"FloorPlanId":null,"Currency":"USD","ProductSettings":null,"CustomerSettings":null,"ResellerSettings":null}]''')]
df1 = spark.createDataFrame(data, ("key", "value"))
df1.show(truncate=True)

#Apply the schema to the JSON string
df2 = df1.withColumn("value_w_schema", psf.from_json(df1.value, cart_CartProducts_schema))
#df2.printSchema()
df2.show()

标签: jsonapache-sparkschemaexplode

解决方案


我可以使用这里给出的答案来解决它:(第二个): https ://stackoverflow.com/a/58037383/4253760

我的问题最好表述为:如何为 JSON 数组编写模式?

问题 JSON,而不是 JSON 数组:(现在我想我将能够为我的复杂 JSON 编写模式。

[{"CategoryName":"cat5","CategoryTitle":"cat_title","CategoryLevels":"5"}]

解决方案:

trial_sch = ArrayType(StructType([
            StructField("CategoryName", StringType()),
            StructField("CategoryTitle", StringType()),
            StructField("CategoryLevels", StringType())
])
)

data = [(10,'''[{"CategoryName":"cat5","CategoryTitle":"cat_title","CategoryLevels":"5"}]''')]
df1 = spark.createDataFrame(data, ("key", "value"))
df1.show(truncate=True)

#Apply the schema to the JSON string
df2 = df1.withColumn("value_w_schema", psf.from_json(df1.value, trial_sch))  #Change schema here
#df2.printSchema()
df2.select("value_w_schema").show(truncate=False)

推荐阅读