首页 > 解决方案 > 从 Kafka Sparkstreaming 转换 JSON 中的数据类型

问题描述

我有一个 JSON,我正在使用 spark 流从 kafka 主题中读取它

{"COUNTRY_REGION": "United States",  "GROCERY_AND_PHARMACY_CHANGE_PERC": "-7", "PARKS_CHANGE_PERC": "\\\\N",  "LAST_UPDATE_DATE": "05:31.7"}

我知道我们首先需要创建一个我在这里完成的模式并解析我们从 Kafka 获得的输入 json,即通过 from_json 函数的值字段。

schema = StructType([ 
    StructField("COUNTRY_REGION",StringType(),True), 
    StructField("PROVINCE_STATE",StringType(),True),
    StructField("ISO_3166_1",StringType(),True), 
    StructField("ISO_3166_2", StringType(), True), 
    StructField("DATE", DateType(), True), 
    StructField("GROCERY_AND_PHARMACY_CHANGE_PERC", IntegerType(), True),
    StructField("PARKS_CHANGE_PERC", IntegerType(), True), 
    StructField("RESIDENTIAL_CHANGE_PERC", IntegerType(), True), 
    StructField("RETAIL_AND_RECREATION_CHANGE_PERC", IntegerType(), True),                    
    StructField("TRANSIT_STATIONS_CHANGE_PERC", IntegerType(), True),  
    StructField("WORKPLACES_CHANGE_PERC", IntegerType(), True), 
    StructField("LAST_UPDATE_DATE", DateType(), True),
    StructField("LAST_REPORTED_FLAG", BooleanType(), True),
    StructField("SUB_REGION_2", StringType(), True),
  ])

json_edit = df.select (from_json("value",schema).alias("json"))

但是,我意识到GROCERY_AND_PHARMACY_CHANGE_PERC,PARKS_CHANGE_PERCLAST_UPDATE_DATE变为 null。

display(json_edit)

{"COUNTRY_REGION": "United States",  "GROCERY_AND_PHARMACY_CHANGE_PERC": null, "PARKS_CHANGE_PERC": null, "LAST_UPDATE_DATE": null}

我意识到这是因为原始 JSON,例如"GROCERY_AND_PHARMACY_CHANGE_PERC": "-7"它应该是"GROCERY_AND_PHARMACY_CHANGE_PERC": -7.

在将字符串解析为架构之前,有什么方法可以将字符串转换为双精度/整数?

谢谢!

标签: pythonapache-sparkpysparkspark-streaming

解决方案


您可以将三列的类型更改为StringType中的schema,解析json,然后稍后单独处理三列:

df=...

schema = StructType([ 
    StructField("COUNTRY_REGION",StringType(),True), 
    StructField("PROVINCE_STATE",StringType(),True),
    StructField("ISO_3166_1",StringType(),True), 
    StructField("ISO_3166_2", StringType(), True), 
    StructField("DATE", DateType(), True), 
    StructField("GROCERY_AND_PHARMACY_CHANGE_PERC", StringType(), True), #using StringType
    StructField("PARKS_CHANGE_PERC", StringType(), True), #using StringType
    StructField("RESIDENTIAL_CHANGE_PERC", IntegerType(), True), 
    StructField("RETAIL_AND_RECREATION_CHANGE_PERC", IntegerType(), True),                    
    StructField("TRANSIT_STATIONS_CHANGE_PERC", IntegerType(), True),  
    StructField("WORKPLACES_CHANGE_PERC", IntegerType(), True), 
    StructField("LAST_UPDATE_DATE", StringType(), True), #using StringType
    StructField("LAST_REPORTED_FLAG", BooleanType(), True),
    StructField("SUB_REGION_2", StringType(), True),
  ])
df2=df.select (from_json("value",schema).alias("json"))

解析 json 字符串后,将结构转换为单独的顶级列 ( ),使用withColumnselect("json.*")处理三列,然后在必要时使用此答案重新打包结构:

from pyspark.sql import functions as F

df2.select("json.*") \
    .withColumn("GROCERY_AND_PHARMACY_CHANGE_PERC", 
        F.col("GROCERY_AND_PHARMACY_CHANGE_PERC").cast(IntegerType())) \
    .withColumn("PARKS_CHANGE_PERC", 
        F.col("PARKS_CHANGE_PERC").cast(IntegerType())) \
    .withColumn("LAST_UPDATE_DATE", 
        F.to_timestamp("LAST_UPDATE_DATE", "HH:mm.s")) \
    .withColumn('json', F.struct(*[F.col(col) for col in df2.select('json.*').columns])) \
    .drop(*df2.select('json.*').columns) \
    .show(truncate=False)

备注:在列的示例数据中给出LAST_UPDATE_DATE了字符串"05:31.7"。上面的代码假定这是格式中的时间戳HH:mm.s。由于缺少日期,因此1970-01-01 05:31:07此示例的结果是。这可以使用to_timestamp中的另一种日期格式来解决。


推荐阅读