python - 从 Kafka Sparkstreaming 转换 JSON 中的数据类型
问题描述
我有一个 JSON,我正在使用 spark 流从 kafka 主题中读取它
{"COUNTRY_REGION": "United States", "GROCERY_AND_PHARMACY_CHANGE_PERC": "-7", "PARKS_CHANGE_PERC": "\\\\N", "LAST_UPDATE_DATE": "05:31.7"}
我知道我们首先需要创建一个我在这里完成的模式并解析我们从 Kafka 获得的输入 json,即通过 from_json 函数的值字段。
schema = StructType([
StructField("COUNTRY_REGION",StringType(),True),
StructField("PROVINCE_STATE",StringType(),True),
StructField("ISO_3166_1",StringType(),True),
StructField("ISO_3166_2", StringType(), True),
StructField("DATE", DateType(), True),
StructField("GROCERY_AND_PHARMACY_CHANGE_PERC", IntegerType(), True),
StructField("PARKS_CHANGE_PERC", IntegerType(), True),
StructField("RESIDENTIAL_CHANGE_PERC", IntegerType(), True),
StructField("RETAIL_AND_RECREATION_CHANGE_PERC", IntegerType(), True),
StructField("TRANSIT_STATIONS_CHANGE_PERC", IntegerType(), True),
StructField("WORKPLACES_CHANGE_PERC", IntegerType(), True),
StructField("LAST_UPDATE_DATE", DateType(), True),
StructField("LAST_REPORTED_FLAG", BooleanType(), True),
StructField("SUB_REGION_2", StringType(), True),
])
json_edit = df.select (from_json("value",schema).alias("json"))
但是,我意识到GROCERY_AND_PHARMACY_CHANGE_PERC
,PARKS_CHANGE_PERC
并LAST_UPDATE_DATE
变为 null。
display(json_edit)
{"COUNTRY_REGION": "United States", "GROCERY_AND_PHARMACY_CHANGE_PERC": null, "PARKS_CHANGE_PERC": null, "LAST_UPDATE_DATE": null}
我意识到这是因为原始 JSON,例如"GROCERY_AND_PHARMACY_CHANGE_PERC": "-7"
它应该是"GROCERY_AND_PHARMACY_CHANGE_PERC": -7
.
在将字符串解析为架构之前,有什么方法可以将字符串转换为双精度/整数?
谢谢!
解决方案
您可以将三列的类型更改为StringType
中的schema
,解析json,然后稍后单独处理三列:
df=...
schema = StructType([
StructField("COUNTRY_REGION",StringType(),True),
StructField("PROVINCE_STATE",StringType(),True),
StructField("ISO_3166_1",StringType(),True),
StructField("ISO_3166_2", StringType(), True),
StructField("DATE", DateType(), True),
StructField("GROCERY_AND_PHARMACY_CHANGE_PERC", StringType(), True), #using StringType
StructField("PARKS_CHANGE_PERC", StringType(), True), #using StringType
StructField("RESIDENTIAL_CHANGE_PERC", IntegerType(), True),
StructField("RETAIL_AND_RECREATION_CHANGE_PERC", IntegerType(), True),
StructField("TRANSIT_STATIONS_CHANGE_PERC", IntegerType(), True),
StructField("WORKPLACES_CHANGE_PERC", IntegerType(), True),
StructField("LAST_UPDATE_DATE", StringType(), True), #using StringType
StructField("LAST_REPORTED_FLAG", BooleanType(), True),
StructField("SUB_REGION_2", StringType(), True),
])
df2=df.select (from_json("value",schema).alias("json"))
解析 json 字符串后,将结构转换为单独的顶级列 ( ),使用withColumnselect("json.*")
处理三列,然后在必要时使用此答案重新打包结构:
from pyspark.sql import functions as F
df2.select("json.*") \
.withColumn("GROCERY_AND_PHARMACY_CHANGE_PERC",
F.col("GROCERY_AND_PHARMACY_CHANGE_PERC").cast(IntegerType())) \
.withColumn("PARKS_CHANGE_PERC",
F.col("PARKS_CHANGE_PERC").cast(IntegerType())) \
.withColumn("LAST_UPDATE_DATE",
F.to_timestamp("LAST_UPDATE_DATE", "HH:mm.s")) \
.withColumn('json', F.struct(*[F.col(col) for col in df2.select('json.*').columns])) \
.drop(*df2.select('json.*').columns) \
.show(truncate=False)
备注:在列的示例数据中给出LAST_UPDATE_DATE
了字符串"05:31.7"
。上面的代码假定这是格式中的时间戳HH:mm.s
。由于缺少日期,因此1970-01-01 05:31:07
此示例的结果是。这可以使用to_timestamp中的另一种日期格式来解决。
推荐阅读
- reactjs - 如何在屏幕中处理多个上下文状态?
- angular - 变量不会在视图中更新
- bash - Source one shell script in another gives error in MacOs
- python - How do I write a Python class that is a node in a linked list while using type hinting
- python - RASA - rasa 运行操作 - 本地主机问题
- swift - How to run Swift binaries compiled on macOS on Linux?
- flutter - PlatformException(already_active, Image picker is already active, null) on huawei y5
- delphi - Delphi XE6 crashes while installing bpl
- sql - 查询时有持续时间的多种情况(Redshift)
- jquery - 使用 jQuery 中的本地存储将数据打印到表中