python - 在pyspark的数据框中构造一个嵌套的json
问题描述
我在构建以下数据时遇到了一些困难,我希望得到有关该主题的专家的帮助
我需要在 pyspark 的数据框中构造一个 json。我没有完整的架构,但它下面的嵌套结构不会改变:
import http.client conn = http.client.HTTPSConnection("xxx")
payload = ""
conn.request("GET", "xxx", payload)
res = conn.getresponse() data = res.read().decode("utf-8")
json_obj = json.loads(data)
df = json.dumps(json_obj, indent=2)
这是Json:
{ "car": {
"top1": {
"cl": [
{
"nm": "Setor A",
"prc": "40,00 %",
"tv": [
{
"logo": "https://www.test.com/ddd.jpg",
"nm": "BDFG",
"lk1": "https://www.test.com/ddd/BDFG/",
"lk2": "https://www.test-ddd.com",
"dta": [
{
"nm": "PA",
"cp": "nl",
"vl": "$ 2,50"
},
{
"nm": "FVP",
"cp": "UV",
"vl": "No"
}
],
"prc": "30,00 %"
},
{
"logo": "https://www.test.com/ccc.jpg",
"nome": "BDFH",
"lk1": "https://www.test.com/ddd/BDFH/",
"lk2": "https://www.test-ddd.com",
"dta": [
{
"nm": "PA",
"cp": "nl",
"vl": "$ 2,50"
},
{
"nm": "FVP",
"cp": "UV",
"vl": "No"
}
],
"prc": "70,00 %"
}
]
},
{
"nm": "B",
"prc": "60,00 %",
"tv": [
{
"logo": "https://www.test.com/bomm.jpg",
"nm": "BOOM",
"lk1": "https://www.test.com/ddd/BDFH/",
"lk2": "https://www.test-ddd.com",
"dta": [
{
"nm": "PA",
"cp": "nl",
"vl": "$ 2,50"
},
{
"nm": "FVP",
"cp": "UV",
"vl": "No"
}
],
"prc": "100,00 %"
}
]
}
]
},
"top2": {
"cl": [{}]
"top3": {
"cl": [{}]
}
json 文件示例
我试图以某种方式构建我的数据但没有成功:
schema = StructType(
[
StructField("car", ArrayType(StructType([
StructField("top1", ArrayType(StructType([
StructField("cl", ArrayType(StructType([
StructField("nm", StringType(),True),
StructField("prc", StringType(),True),
StructField("tv", ArrayType(StructType([
StructField("logo", StringType(),True),
StructField("nm", StringType(),True),
StructField("lk1", StringType(),True),
StructField("lk2", StringType(),True),
StructField("dta", ArrayType(StructType([
StructField("nm", StringType(),True),
StructField("cp", StringType(),True),
StructField("vl", StringType(),True)]))),
StructField("prc", StringType(),True)])))])))]))),
StructField("top2", ArrayType(StructType([
StructField("cl", ArrayType(StructType([
StructField("nm", StringType(),True),
StructField("prc", StringType(),True),
StructField("tv", ArrayType(StructType([
StructField("logo", StringType(),True),
StructField("nm", StringType(),True),
StructField("lk1", StringType(),True),
StructField("lk2", StringType(),True),
StructField("dta", ArrayType(StructType([
StructField("nm", StringType(),True),
StructField("cp", StringType(),True),
StructField("vl", StringType(),True)]))),
StructField("prc", StringType(),True)])))])))]))),
StructField("top3", ArrayType(StructType([
StructField("cl", ArrayType(StructType([
StructField("nm", StringType(),True),
StructField("prc", StringType(),True),
StructField("tv", ArrayType(StructType([
StructField("logo", StringType(),True),
StructField("nm", StringType(),True),
StructField("lk1", StringType(),True),
StructField("lk2", StringType(),True),
StructField("dta", ArrayType(StructType([
StructField("nm", StringType(),True),
StructField("cp", StringType(),True),
StructField("vl", StringType(),True)]))),
StructField("prc", StringType(),True)])))])))])))])))])
df2 = sqlContext.read.json(df, schema)
df2.printSchema()
我想改变这样的东西:
是否有任何功能可以促进这种中断和结构化这些数据?
解决方案
您可以将 JSON 文件路径或 RDD 传递给 json() 方法。
您需要使用 parallelize() 从 JSON 字符串中创建 RDD,然后将此 RDD 传递给 json()。
spark = SparkSession.builder.master("local[*]").getOrCreate()
rdd = spark.sparkContext.parallelize([json.dumps(json_obj,indent=2)])
# Schema will be inferred automatically. You can pass schema if you want.
json_df = spark.read.json(rdd)
推荐阅读
- angular - 如何获取来自角度服务的变量
- winapi - 如何安全地杀死 robocopy 进程
- node.js - Mongoose "尚未为模型 \"Designation\" 注册架构。\n使用 mongoose.model(name, schema)"
- javascript - 使用 react-router 路由后,样式的 glidejs 动态计算被破坏
- nginx - Nginx - 从多个子目录索引静态文件夹,为 GeoIp 重写
- python - 如何同时绘制多个点云python
- python-3.x - 我可以在 Azure ML 的一次训练中使用两个计算集群吗?
- javascript - 在 JS 中创建 img 时,如何在 JS 中设置它的容器大小?
- android - Android Room 一对多关系空列表
- python - 为什么此示例中的图形大小(y 轴)会波动?