首页 > 解决方案 > 从 json 模式构建 spark 模式

问题描述

我正在尝试构建一个希望在创建数据框时明确提供的 spark 模式,我可以使用下面的方法生成 json 模式

from pyspark.sql.types import StructType
# Save schema from the original DataFrame into json:
schema_json = df.schema.json()

这给了我

{"fields":[{"metadata":{},"name":"cloud_events_version","nullable":true,"type":"string"},{"metadata":{},"name":"data","nullable":true,"type":{"fields":[{"metadata":{},"name":"email","nullable":true,"type":"string"},{"metadata":{},"name":"member_role","nullable":true,"type":"string"},{"metadata":{},"name":"reg_source_product","nullable":true,"type":"string"},{"metadata":{},"name":"school_type","nullable":true,"type":"string"},{"metadata":{},"name":"year_in_college","nullable":true,"type":"long"}],"type":"struct"}},{"metadata":{},"name":"event_time","nullable":true,"type":"string"},{"metadata":{},"name":"event_type","nullable":true,"type":"string"},{"metadata":{},"name":"event_type_version","nullable":true,"type":"string"},{"metadata":{},"name":"event_validated_ts","nullable":true,"type":"string"},{"metadata":{},"name":"event_validation_status","nullable":true,"type":"string"},{"metadata":{},"name":"extensions","nullable":true,"type":{"fields":[{"metadata":{},"name":"client_common","nullable":true,"type":{"fields":[{"metadata":{},"name":"adobe_mcid","nullable":true,"type":"string"},{"metadata":{},"name":"adobe_sdid","nullable":true,"type":"string"},{"metadata":{},"name":"auth_state","nullable":true,"type":"string"},{"metadata":{},"name":"uuid","nullable":true,"type":"string"},{"metadata":{},"name":"client_experiments","nullable":true,"type":"string"},{"metadata":{},"name":"client_ip_address","nullable":true,"type":"string"},{"metadata":{},"name":"device_id","nullable":true,"type":"string"},{"metadata":{},"name":"page_name","nullable":true,"type":"string"},{"metadata":{},"name":"referral_url","nullable":true,"type":"string"},{"metadata":{},"name":"url","nullable":true,"type":"string"},{"metadata":{},"name":"user_agent","nullable":true,"type":"string"},{"metadata":{},"name":"uvn","nullable":true,"type":"string"}],"type":"struct"}}],"type":"struct"}},{"metadata":{},"name":"source","nullable":true,"type":"string"},{"metadata":{},"name":"validated_message","nullable":true,"type":"string"},{"metadata":{},"name":"year","nullable":true,"type":"integer"},{"metadata":{},"name":"mon","nullable":true,"type":"integer"},{"metadata":{},"name":"day","nullable":true,"type":"integer"},{"metadata":{},"name":"hour","nullable":true,"type":"integer"}],"type":"struct"}

但为此我需要解析数据帧,这需要一些时间,我试图避免

我可以做的一件事是从我们内部拥有的目录中获取所需的模式。这给出了类似的东西

[{u'Name': u'cloud_events_version', u'Type': u'string'},
 {u'Name': u'event_type', u'Type': u'string'},
 {u'Name': u'event_time', u'Type': u'string'},
 {u'Name': u'data', u'Type': u'struct<school_type:string,reg_source_product:string,member_role:string,email:string,year_in_college:int>'},
 {u'Name': u'source', u'Type': u'string'},
 {u'Name': u'extensions', u'Type': u'struct<client_common:struct<auth_state:string,client_ip_address:string,client_experiments:string,uvn:string,device_id:string,adobe_sdid:string,url:string,page_name:string,user_agent:string,uuid:string,adobe_mcid:string,referral_url:string>>'},
 {u'Name': u'event_type_version', u'Type': u'string'},
 {u'Name': u'event_validation_status', u'Type': u'string'},
 {u'Name': u'event_validated_ts', u'Type': u'string'},
 {u'Name': u'validated_message', u'Type': u'string'}]

我正在尝试编写一个生成上述 json 的递归 python 查询。当类型为字符串时,逻辑是遍历这个 dict 列表并将名称和类型分配给这个字典

{"metadata" : {},"name" : columnName,"nullable" : True,"type" : columnType}

但是当类型是结构时,它会创建结构所有元素的字典列表并将其分配给类型并递归执行,直到找不到任何结构。

我所能召集的就是

def structRecursive(columnName,columnType):
    if "struct" not in columnType:
        ColumnDict = {"metadata" : {},"name" : columnName,"nullable" : True,"type" : columnType}
    else:
        structColumnList = []
        structColumnDict = {
            'metadata': {},
            'name': columnName,
            'nullable': True,
            'type': {'fields': structColumnList, 'type': 'struct'}
        }
        if columnType.count('struct<')==1:
            structCol = columnName
            structColList = columnType.encode('utf-8').replace('struct<',
                    '').replace('>', '').split(',')
            for item in structColList:
                fieldName = item.split(':')[0]
                dataType = item.split(':')[1]
                nodeDict = {}
                nodeDict['metadata'] = {}
                nodeDict['name'] = '{}'.format(fieldName)
                nodeDict['nullable'] = True
                nodeDict['type'] = '{}'.format(dataType)
                structColumnList.append(nodeDict)
        else:
            columnName = columnType.replace('struct<','',1).replace('>','').split(':')[0]
            columnType = columnType.split("{}:".format(columnName),1)[1].replace('>','',1)
        return structColumnDict

MainStructList = []
MainStructDict = {'fields': MainStructList, 'type': 'struct'}
for item in ListOfDict :
    columnName = item['Name'].encode('utf-8')
    columnType = item['Type'].encode('utf-8')
    MainStructList.append(structRecursive(columnName,columnType))

当然,这并没有给出预期的结果。很想在这里得到一些建议。

标签: pythonapache-sparkpyspark

解决方案


如果我的问题正确,您想解析列列表并将其转换为描述具有复杂类型的模式的字典。困难的部分是解析表示复杂类型的字符串。首先,我们需要一个从列定义中提取结构条目的方法:

def extract_struct(text):
    stop = 7
    flag = 1
    for c in text[7:]:
        stop += 1
        if c == "<":
            flag += 1
        if c == ">":
            flag -= 1
        if flag == 0:
            return text[:stop], text[stop:]

这将返回提取的结构和列定义中的剩余文本。例如

extract_struct("struct<a:int,b:double>,c:string")

将返回

("struct<a:int,d:double>", "c:string").

其次,我们需要遍历每个列类型并获取结构条目的定义:

def parse(s, node):
    while s != '':
        # Strip column name
        col_name = s.partition(':')[0]
        s = s.partition(':')[2]

        # If column type is struct, parse it as well
        if s.startswith('struct'):
            col_type, s = extract_struct(s)
            node[col_name] = {}
            parse(col_type[7:-1], node[col_name])
        else:
            # Just add column definition
            col_type = s.partition(',')[0]
            node[col_name] = {
                "metadata": {},
                "name": col_name,
                "nullable": True,
                "type": col_type
            }

        # Go to next entry
        s = s.partition(',')[2]

如果列类型是简单的,上述方法只是在模式树中的当前节点添加一个新列,否则它会提取名称和结构并递归遍历结构的子条目。现在我们只需要遍历每一列并解析它们。因此,在用一种方法总结上述内容之后:

def build(columns):
    def extract_struct(text):
        stop = 7
        flag = 1
        for c in text[7:]:
            stop += 1
            if c == '<':
                flag += 1
            if c == '>':
                flag -= 1
            if flag == 0:
                return text[:stop], text[stop:]

    def parse(s, node):
        while s != '':
            # Strip column name
            col_name = s.partition(':')[0]
            s = s.partition(':')[2]

            # If column type is struct, parse it as well
            if s.startswith('struct'):
                col_type, s = extract_struct(s)
                node[col_name] = {}
                parse(col_type[7:-1], node[col_name])
            else:
                # Just add column definition
                col_type = s.partition(',')[0]
                node[col_name] = {
                    "metadata": {},
                    "name": col_name,
                    "nullable": True,
                    "type": col_type
                }

            # Go to next entry
            s = s.partition(',')[2]

    schema = {}
    for column in columns:
        parse("{}:{}".format(column['Name'], column['Type']), schema)
    return schema

现在,如果您在示例列表上运行它,您会得到以下字典(很容易转换为列列表,但顺序并不重要):

{
  "cloud_events_version": {
    "nullable": true, 
    "type": "string", 
    "name": "cloud_events_version", 
    "metadata": {}
  }, 
  "event_type": {
    "nullable": true, 
    "type": "string", 
    "name": "event_type", 
    "metadata": {}
  }, 
  "event_time": {
    "nullable": true, 
    "type": "string", 
    "name": "event_time", 
    "metadata": {}
  }, 
  "event_validated_ts": {
    "nullable": true, 
    "type": "string", 
    "name": "event_validated_ts", 
    "metadata": {}
  }, 
  "event_type_version": {
    "nullable": true, 
    "type": "string", 
    "name": "event_type_version", 
    "metadata": {}
  }, 
  "source": {
    "nullable": true, 
    "type": "string", 
    "name": "source", 
    "metadata": {}
  }, 
  "extensions": {
    "client_common": {
      "adobe_sdid": {
        "nullable": true, 
        "type": "string", 
        "name": "adobe_sdid", 
        "metadata": {}
      }, 
      "auth_state": {
        "nullable": true, 
        "type": "string", 
        "name": "auth_state", 
        "metadata": {}
      }, 
      "client_ip_address": {
        "nullable": true, 
        "type": "string", 
        "name": "client_ip_address", 
        "metadata": {}
      }, 
      "url": {
        "nullable": true, 
        "type": "string", 
        "name": "url", 
        "metadata": {}
      }, 
      "client_experiments": {
        "nullable": true, 
        "type": "string", 
        "name": "client_experiments", 
        "metadata": {}
      }, 
      "referral_url": {
        "nullable": true, 
        "type": "string", 
        "name": "referral_url", 
        "metadata": {}
      }, 
      "page_name": {
        "nullable": true, 
        "type": "string", 
        "name": "page_name", 
        "metadata": {}
      }, 
      "user_agent": {
        "nullable": true, 
        "type": "string", 
        "name": "user_agent", 
        "metadata": {}
      }, 
      "uvn": {
        "nullable": true, 
        "type": "string", 
        "name": "uvn", 
        "metadata": {}
      }, 
      "chegg_uuid": {
        "nullable": true, 
        "type": "string", 
        "name": "chegg_uuid", 
        "metadata": {}
      }, 
      "adobe_mcid": {
        "nullable": true, 
        "type": "string", 
        "name": "adobe_mcid", 
        "metadata": {}
      }, 
      "device_id": {
        "nullable": true, 
        "type": "string", 
        "name": "device_id", 
        "metadata": {}
      }
    }
  }, 
  "validated_message": {
    "nullable": true, 
    "type": "string", 
    "name": "validated_message", 
    "metadata": {}
  }, 
  "event_validation_status": {
    "nullable": true, 
    "type": "string", 
    "name": "event_validation_status", 
    "metadata": {}
  }, 
  "data": {
    "school_type": {
      "nullable": true, 
      "type": "string", 
      "name": "school_type", 
      "metadata": {}
    }, 
    "reg_source_product": {
      "nullable": true, 
      "type": "string", 
      "name": "reg_source_product", 
      "metadata": {}
    }, 
    "member_role": {
      "nullable": true, 
      "type": "string", 
      "name": "member_role", 
      "metadata": {}
    }, 
    "email": {
      "nullable": true, 
      "type": "string", 
      "name": "email", 
      "metadata": {}
    }, 
    "year_in_college": {
      "nullable": true, 
      "type": "int", 
      "name": "year_in_college", 
      "metadata": {}
    }
  }
}

最后,请注意这仅适用于简单类型和struct(不适用于arrayormap类型)。但它也很容易扩展到其他复杂类型。


推荐阅读