apache-spark - 将 json 从 kinesis 读取到 pyspark 时出现问题
问题描述
我正在尝试从 Kinesis 读取流式 JSON 数据到 PySpark。我的 JSON 看起来像:
{'installmentNo': '10', 'loanId': '1'}
我已经指定了架构,但是当 spark 读取数据时我得到“null”。下面是代码片段。
from pyspark.sql.types import *
from pyspark.sql.functions import from_json
fields = [
StructField("installmentNo", IntegerType(), True),
StructField("loanId", IntegerType(), True)
]
pythonSchema = StructType(fields)
kinesisDf = spark.readStream \
.format("kinesis")\
.option("streamName", kinesisStreamName)\
.option("region", kinesisRegion)\
.option("initialPosition", "latest")\
.option("awsAccessKey", awsAccessKeyId)\
.option("awsSecretKey", awsSecretKey).load()
dataDevicesDF = kinesisDf.selectExpr("cast (data as STRING) my_json_data").select(from_json("my_json_data", pythonSchema).alias("yp_inst")).select("yp_inst.*")
display(dataDevicesDF)
输出:
但是,当我删除“from_json”部分时,我得到一个带有 JSON 字符串的列。但我想将 json 分解为特定的列并将数据作为 df 获取。有人可以建议我进行更改吗?
解决方案
架构不正确 - 您的数据是字符串,而您声明整数。
请将定义更改为
pythonSchema = StructType([
StructField("installmentNo", StringType(), True),
StructField("loanId", StringType(), True)
])
并转换输出:
from_json(
"my_json_data", pythonSchema
).cast("struct<installmentNo: integer, loanId: integer>"))
其余代码应保持原样,但为清楚起见,您可以显式设置选项(因为输入不是标准 JSON):
from_json(
"my_json_data", pythonSchema, {"allowSingleQuotes": "true"}
).cast("struct<installmentNo: integer, loanId: integer>"))
推荐阅读
- kotlin - org.koin.core.error.InstanceCreationException:无法为 [Factory:'com.pokemonsearch.presentation.search.SearchViewModel'] 创建实例
- javascript - 对数组中的数据进行分组,求和并添加子节点
- java - 每年每个季度的第一个星期五的下星期日的 Cron 作业
- python - Basin Hopping方法的编程和优化代码
- c# - 如何在 C# 中处理多个客户端 TCP/IP
- etl - 我们什么时候考虑对星型模式进行雪花化?
- html - 为什么我的页面 div 没有填满整个屏幕?
- css - 伪类 after & before 仅在具有绝对位置时显示
- python - python selenium 按标签在 div 中查找元素(没有名称、类、id 或文本)
- google-sheets - 我可以向查询函数添加三个不同的规则吗?