python - 从 json 模式构建 spark 模式
问题描述
我正在尝试构建一个希望在创建数据框时明确提供的 spark 模式,我可以使用下面的方法生成 json 模式
from pyspark.sql.types import StructType
# Save schema from the original DataFrame into json:
schema_json = df.schema.json()
这给了我
{"fields":[{"metadata":{},"name":"cloud_events_version","nullable":true,"type":"string"},{"metadata":{},"name":"data","nullable":true,"type":{"fields":[{"metadata":{},"name":"email","nullable":true,"type":"string"},{"metadata":{},"name":"member_role","nullable":true,"type":"string"},{"metadata":{},"name":"reg_source_product","nullable":true,"type":"string"},{"metadata":{},"name":"school_type","nullable":true,"type":"string"},{"metadata":{},"name":"year_in_college","nullable":true,"type":"long"}],"type":"struct"}},{"metadata":{},"name":"event_time","nullable":true,"type":"string"},{"metadata":{},"name":"event_type","nullable":true,"type":"string"},{"metadata":{},"name":"event_type_version","nullable":true,"type":"string"},{"metadata":{},"name":"event_validated_ts","nullable":true,"type":"string"},{"metadata":{},"name":"event_validation_status","nullable":true,"type":"string"},{"metadata":{},"name":"extensions","nullable":true,"type":{"fields":[{"metadata":{},"name":"client_common","nullable":true,"type":{"fields":[{"metadata":{},"name":"adobe_mcid","nullable":true,"type":"string"},{"metadata":{},"name":"adobe_sdid","nullable":true,"type":"string"},{"metadata":{},"name":"auth_state","nullable":true,"type":"string"},{"metadata":{},"name":"uuid","nullable":true,"type":"string"},{"metadata":{},"name":"client_experiments","nullable":true,"type":"string"},{"metadata":{},"name":"client_ip_address","nullable":true,"type":"string"},{"metadata":{},"name":"device_id","nullable":true,"type":"string"},{"metadata":{},"name":"page_name","nullable":true,"type":"string"},{"metadata":{},"name":"referral_url","nullable":true,"type":"string"},{"metadata":{},"name":"url","nullable":true,"type":"string"},{"metadata":{},"name":"user_agent","nullable":true,"type":"string"},{"metadata":{},"name":"uvn","nullable":true,"type":"string"}],"type":"struct"}}],"type":"struct"}},{"metadata":{},"name":"source","nullable":true,"type":"string"},{"metadata":{},"name":"validated_message","nullable":true,"type":"string"},{"metadata":{},"name":"year","nullable":true,"type":"integer"},{"metadata":{},"name":"mon","nullable":true,"type":"integer"},{"metadata":{},"name":"day","nullable":true,"type":"integer"},{"metadata":{},"name":"hour","nullable":true,"type":"integer"}],"type":"struct"}
但为此我需要解析数据帧,这需要一些时间,我试图避免
我可以做的一件事是从我们内部拥有的目录中获取所需的模式。这给出了类似的东西
[{u'Name': u'cloud_events_version', u'Type': u'string'},
{u'Name': u'event_type', u'Type': u'string'},
{u'Name': u'event_time', u'Type': u'string'},
{u'Name': u'data', u'Type': u'struct<school_type:string,reg_source_product:string,member_role:string,email:string,year_in_college:int>'},
{u'Name': u'source', u'Type': u'string'},
{u'Name': u'extensions', u'Type': u'struct<client_common:struct<auth_state:string,client_ip_address:string,client_experiments:string,uvn:string,device_id:string,adobe_sdid:string,url:string,page_name:string,user_agent:string,uuid:string,adobe_mcid:string,referral_url:string>>'},
{u'Name': u'event_type_version', u'Type': u'string'},
{u'Name': u'event_validation_status', u'Type': u'string'},
{u'Name': u'event_validated_ts', u'Type': u'string'},
{u'Name': u'validated_message', u'Type': u'string'}]
我正在尝试编写一个生成上述 json 的递归 python 查询。当类型为字符串时,逻辑是遍历这个 dict 列表并将名称和类型分配给这个字典
{"metadata" : {},"name" : columnName,"nullable" : True,"type" : columnType}
但是当类型是结构时,它会创建结构所有元素的字典列表并将其分配给类型并递归执行,直到找不到任何结构。
我所能召集的就是
def structRecursive(columnName,columnType):
if "struct" not in columnType:
ColumnDict = {"metadata" : {},"name" : columnName,"nullable" : True,"type" : columnType}
else:
structColumnList = []
structColumnDict = {
'metadata': {},
'name': columnName,
'nullable': True,
'type': {'fields': structColumnList, 'type': 'struct'}
}
if columnType.count('struct<')==1:
structCol = columnName
structColList = columnType.encode('utf-8').replace('struct<',
'').replace('>', '').split(',')
for item in structColList:
fieldName = item.split(':')[0]
dataType = item.split(':')[1]
nodeDict = {}
nodeDict['metadata'] = {}
nodeDict['name'] = '{}'.format(fieldName)
nodeDict['nullable'] = True
nodeDict['type'] = '{}'.format(dataType)
structColumnList.append(nodeDict)
else:
columnName = columnType.replace('struct<','',1).replace('>','').split(':')[0]
columnType = columnType.split("{}:".format(columnName),1)[1].replace('>','',1)
return structColumnDict
MainStructList = []
MainStructDict = {'fields': MainStructList, 'type': 'struct'}
for item in ListOfDict :
columnName = item['Name'].encode('utf-8')
columnType = item['Type'].encode('utf-8')
MainStructList.append(structRecursive(columnName,columnType))
当然,这并没有给出预期的结果。很想在这里得到一些建议。
解决方案
如果我的问题正确,您想解析列列表并将其转换为描述具有复杂类型的模式的字典。困难的部分是解析表示复杂类型的字符串。首先,我们需要一个从列定义中提取结构条目的方法:
def extract_struct(text):
stop = 7
flag = 1
for c in text[7:]:
stop += 1
if c == "<":
flag += 1
if c == ">":
flag -= 1
if flag == 0:
return text[:stop], text[stop:]
这将返回提取的结构和列定义中的剩余文本。例如
extract_struct("struct<a:int,b:double>,c:string")
将返回
("struct<a:int,d:double>", "c:string").
其次,我们需要遍历每个列类型并获取结构条目的定义:
def parse(s, node):
while s != '':
# Strip column name
col_name = s.partition(':')[0]
s = s.partition(':')[2]
# If column type is struct, parse it as well
if s.startswith('struct'):
col_type, s = extract_struct(s)
node[col_name] = {}
parse(col_type[7:-1], node[col_name])
else:
# Just add column definition
col_type = s.partition(',')[0]
node[col_name] = {
"metadata": {},
"name": col_name,
"nullable": True,
"type": col_type
}
# Go to next entry
s = s.partition(',')[2]
如果列类型是简单的,上述方法只是在模式树中的当前节点添加一个新列,否则它会提取名称和结构并递归遍历结构的子条目。现在我们只需要遍历每一列并解析它们。因此,在用一种方法总结上述内容之后:
def build(columns):
def extract_struct(text):
stop = 7
flag = 1
for c in text[7:]:
stop += 1
if c == '<':
flag += 1
if c == '>':
flag -= 1
if flag == 0:
return text[:stop], text[stop:]
def parse(s, node):
while s != '':
# Strip column name
col_name = s.partition(':')[0]
s = s.partition(':')[2]
# If column type is struct, parse it as well
if s.startswith('struct'):
col_type, s = extract_struct(s)
node[col_name] = {}
parse(col_type[7:-1], node[col_name])
else:
# Just add column definition
col_type = s.partition(',')[0]
node[col_name] = {
"metadata": {},
"name": col_name,
"nullable": True,
"type": col_type
}
# Go to next entry
s = s.partition(',')[2]
schema = {}
for column in columns:
parse("{}:{}".format(column['Name'], column['Type']), schema)
return schema
现在,如果您在示例列表上运行它,您会得到以下字典(很容易转换为列列表,但顺序并不重要):
{
"cloud_events_version": {
"nullable": true,
"type": "string",
"name": "cloud_events_version",
"metadata": {}
},
"event_type": {
"nullable": true,
"type": "string",
"name": "event_type",
"metadata": {}
},
"event_time": {
"nullable": true,
"type": "string",
"name": "event_time",
"metadata": {}
},
"event_validated_ts": {
"nullable": true,
"type": "string",
"name": "event_validated_ts",
"metadata": {}
},
"event_type_version": {
"nullable": true,
"type": "string",
"name": "event_type_version",
"metadata": {}
},
"source": {
"nullable": true,
"type": "string",
"name": "source",
"metadata": {}
},
"extensions": {
"client_common": {
"adobe_sdid": {
"nullable": true,
"type": "string",
"name": "adobe_sdid",
"metadata": {}
},
"auth_state": {
"nullable": true,
"type": "string",
"name": "auth_state",
"metadata": {}
},
"client_ip_address": {
"nullable": true,
"type": "string",
"name": "client_ip_address",
"metadata": {}
},
"url": {
"nullable": true,
"type": "string",
"name": "url",
"metadata": {}
},
"client_experiments": {
"nullable": true,
"type": "string",
"name": "client_experiments",
"metadata": {}
},
"referral_url": {
"nullable": true,
"type": "string",
"name": "referral_url",
"metadata": {}
},
"page_name": {
"nullable": true,
"type": "string",
"name": "page_name",
"metadata": {}
},
"user_agent": {
"nullable": true,
"type": "string",
"name": "user_agent",
"metadata": {}
},
"uvn": {
"nullable": true,
"type": "string",
"name": "uvn",
"metadata": {}
},
"chegg_uuid": {
"nullable": true,
"type": "string",
"name": "chegg_uuid",
"metadata": {}
},
"adobe_mcid": {
"nullable": true,
"type": "string",
"name": "adobe_mcid",
"metadata": {}
},
"device_id": {
"nullable": true,
"type": "string",
"name": "device_id",
"metadata": {}
}
}
},
"validated_message": {
"nullable": true,
"type": "string",
"name": "validated_message",
"metadata": {}
},
"event_validation_status": {
"nullable": true,
"type": "string",
"name": "event_validation_status",
"metadata": {}
},
"data": {
"school_type": {
"nullable": true,
"type": "string",
"name": "school_type",
"metadata": {}
},
"reg_source_product": {
"nullable": true,
"type": "string",
"name": "reg_source_product",
"metadata": {}
},
"member_role": {
"nullable": true,
"type": "string",
"name": "member_role",
"metadata": {}
},
"email": {
"nullable": true,
"type": "string",
"name": "email",
"metadata": {}
},
"year_in_college": {
"nullable": true,
"type": "int",
"name": "year_in_college",
"metadata": {}
}
}
}
最后,请注意这仅适用于简单类型和struct
(不适用于array
ormap
类型)。但它也很容易扩展到其他复杂类型。
推荐阅读
- ruby - ActiveRecord::RecordNotFound 在 StaticPagesController#home
- android - 覆盖后退按钮 wix 反应原生导航 V2?
- javascript - 关于 Maps Javascript API 上的地图移动计费
- javascript - 如何使用javascript查找和计算数组中的重复整数
- c# - 文件传输过程中视频文件元数据信息丢失
- c# - 我需要将 .json 转换为数组
- python - 限制核心数 Dask 不工作?
- javascript - 如何在分配变量时使用 async/await?
- c# - 将相关的第三个类从第三个 SQL 表传递到具有 ViewModel 的视图中
- android - arduino中气体检查器MQ-135的值十进制单位是多少?