首页 > 解决方案 > 访问自定义架构文件并将架构强加到 pyspark 数据框时出错

问题描述

我在云存储中定义了一个模式。在 pyspark 数据框中读取数据时,我想强加相同的模式。

Schema文件格式为schema.txt(放在云桶中)

[StructField('_corrupt_record', StringType(), True),StructField('id', StructType([ StructField('_data', StringType(), True) ])), StructField('clusterTime', StructType([   StructField('_timestamp' , StructType([ StructField('i',IntegerType(), True)]))])),StructField('fullDocument', StructType([StructField('seg' , StructType([ StructField('ap_lan',StringType(), True),StructField('mcd',IntegerType(), True),StructField('dd',StringType(), True), StructField('fgr',StringType(), True),StructField('na',StringType(), True)]))])),StructField('nh', StringType(), True)]

代码如下:

import json
import ast
from google.cloud import storage
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructField, StringType, StructType, IntegerType, FloatType,LongType,BooleanType)

spark_session = SparkSession.builder.appName("Test").getOrCreate()
spark_session.conf.set('temporaryGcsBucket', bucket)

def get_configs():
    print("Inside get configs ")
    with open("config.json") as json_data_file:
        data = json.load(json_data_file)
    input_schema = get_schema(data)
    return input_schema
  
def get_schema(data):
    """This function loads the given schema of all the events from cloud bucket available"""
    client = storage.Client()
    project_name = data["env_args"]["project"]
    bucket_name = data["env_args"]["bucket_name"]
    file_name = data["env_args"]["file_name"]
    bucket = client.get_bucket(bucket_name)
    blob = bucket.get_blob(file_name)
    schema_blob = blob.download_as_string()
    return schema_blob

#The call goes to config file which informs which bucket and file is to be read.

schema = get_configs()   ```It returns datatype as bytes so I convert it to String using decode('utf-8') ```

change_struct = schema.decode('utf-8')
final_struct = StructType(fields=change_struct)

df = spark_session.read.option("multiline","true")\
                   .option("mode", "PERMISSIVE")\
                   .option("primitivesAsString", "false")\
                   .schema(final_struct)\
                   .option("columnNameOfCorruptRecord", "_corrupt_record")\
                   .option("allowFieldAddition","true")\
                   .json(x)

df.show(truncate=True)

错误信息:

    final_struct = StructType(fields=final_struct1)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 492, in __init__
    self.names = [f.name for f in fields]
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 492, in <listcomp>
    self.names = [f.name for f in fields]
AttributeError: 'str' object has no attribute 'name'

我尝试过的事情:

当模式在代码本身中定义时,它工作正常。

import json
import ast
from google.cloud import storage
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructField, StringType, StructType, IntegerType, FloatType,LongType,BooleanType)

spark_session = SparkSession.builder.appName("Test").getOrCreate()
spark_session.conf.set('temporaryGcsBucket', bucket)

final_struct = StructType([StructField('_corrupt_record', StringType(), True),StructField('id', StructType([ StructField('_data', StringType(), True) ])), StructField('clusterTime', StructType([   StructField('_timestamp' , StructType([ StructField('i',IntegerType(), True)]))])),StructField('fullDocument', StructType([StructField('seg' , StructType([ StructField('ap_lan',StringType(), True),StructField('mcd',IntegerType(), True),StructField('dd',StringType(), True), StructField('fgr',StringType(), True),StructField('na',StringType(), True)]))])),StructField('nh', StringType(), True)])

df = spark_session.read.option("multiline","true")\
                   .option("mode", "PERMISSIVE")\
                   .option("primitivesAsString", "false")\
                   .schema(final_struct)\
                   .option("columnNameOfCorruptRecord", "_corrupt_record")\
                   .option("allowFieldAddition","true")\
                   .json(x)

请帮助我了解这里出了什么问题。

更新的架构文件:将架构序列化为 json 格式,如下所示:

{"type": "struct", "fields": [{"name": "_corrupt_record", "type": "string", "nullable": true, "metadata": {}}, {"name": "id", "type": {"type": "struct", "fields": [{"name": "_data", "type": "string", "nullable": true, "metadata": {}}]}, "nullable": true, "metadata": {}}, {"name": "clusterTime", "type": {"type": "struct", "fields": [{"name": "_timestamp", "type": {"type": "struct", "fields": [{"name": "i", "type": "integer", "nullable": true, "metadata": {}}]}, "nullable": true, "metadata": {}}]}, "nullable": true, "metadata": {}}, {"name": "fullDocument", "type": {"type": "struct", "fields": [{"name": "sg", "type": {"type": "struct", "fields": [{"name": "col1", "type": "string", "nullable": true, "metadata": {}}, {"name": "col2", "type": "integer", "nullable": true, "metadata": {}}, {"name": "col3", "type": "string", "nullable": true, "metadata": {}}, {"name": "col4", "type": "string", "nullable": true, "metadata": {}}, {"name": "col5", "type": "string", "nullable": true, "metadata": {}}]}, "nullable": true, "metadata": {}}]}, "nullable": true, "metadata": {}}, {"name": "operationType", "type": "string", "nullable": true, "metadata": {}}]}

现在尝试使用以下代码从 json 文件格式读取自定义架构。

import json
import ast
from google.cloud import storage
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructField, StringType, StructType, IntegerType, FloatType,LongType,BooleanType)

spark_session = SparkSession.builder.appName("Test").getOrCreate()
spark_session.conf.set('temporaryGcsBucket', bucket)

with open("/home/path/schema.json") as f:
     new_schema = StructType.fromJson(json.load(f))
     final_schema = new_schema.simpleString()
     print(final_schema)
print('\n\n')
df = spark_session.read.option("multiline","true")\
                   .option("mode", "PERMISSIVE")\
                   .option("primitivesAsString", "false")\
                   .schema(final_schema)\
                   .option("columnNameOfCorruptRecord", "_corrupt_record")\
                   .option("allowFieldAddition","true")\
                   .json(x)

final_schema 打印为:

struct<_corrupt_record:string,id:struct<_data:string>,clusterTime:struct<_timestamp:struct<i:int>>,fullDocument:struct<sg:struct<col1:string,col2:int,col3:string,col4:string,col5:string>>,operationType:string>

但是在数据帧读取中传递相同的内容时,它会给出错误


  File "/home/tes/test_driver.py", line 58, in dstream_to_rdd
    .schema(final_schema)\
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 103, in schema
    self._jreader = self._jreader.schema(schema)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.ParseException: 
extraneous input '<' expecting {'ADD', 'AFTER', 'ALL', 'ALTER', 'ANALYZE', 'AND', 'ANTI', 'ANY', 'ARCHIVE', 'ARRAY', 'AS', 'ASC', 'AT', 'AUTHORIZATION', 'BETWEEN', 'BOTH', 'BUCKET', 'BUCKETS', 'BY', 'CACHE', 'CASCADE', 'CASE', 'CAST', 'CHANGE', 'CHECK', 'CLEAR', 'CLUSTER', 'CLUSTERED', 'CODEGEN', 'COLLATE', 'COLLECTION', 'COLUMN', 'COLUMNS', 'COMMENT', 'COMMIT', 'COMPACT', 'COMPACTIONS', 'COMPUTE', 'CONCATENATE', 'CONSTRAINT', 'COST', 'CREATE', 'CROSS', 'CUBE', 'CURRENT', 'CURRENT_DATE', 'CURRENT_TIME', 'CURRENT_TIMESTAMP', 'CURRENT_USER', 'DATA', 'DATABASE', DATABASES, 'DBPROPERTIES', 'DEFINED', 'DELETE', 'DELIMITED', 'DESC', 'DESCRIBE', 'DFS', 'DIRECTORIES', 'DIRECTORY', 'DISTINCT', 'DISTRIBUTE', 'DIV', 'DROP', 'ELSE', 'END', 'ESCAPE', 'ESCAPED', 'EXCEPT', 'EXCHANGE', 'EXISTS', 'EXPLAIN', 'EXPORT', 'EXTENDED', 'EXTERNAL', 'EXTRACT', 'FALSE', 'FETCH', 'FIELDS', 'FILTER', 'FILEFORMAT', 'FIRST', 'FOLLOWING', 'FOR', 'FOREIGN', 'FORMAT', 'FORMATTED', 'FROM', 'FULL', 'FUNCTION', 'FUNCTIONS', 'GLOBAL', 'GRANT', 'GROUP', 'GROUPING', 'HAVING', 'IF', 'IGNORE', 'IMPORT', 'IN', 'INDEX', 'INDEXES', 'INNER', 'INPATH', 'INPUTFORMAT', 'INSERT', 'INTERSECT', 'INTERVAL', 'INTO', 'IS', 'ITEMS', 'JOIN', 'KEYS', 'LAST', 'LATERAL', 'LAZY', 'LEADING', 'LEFT', 'LIKE', 'LIMIT', 'LINES', 'LIST', 'LOAD', 'LOCAL', 'LOCATION', 'LOCK', 'LOCKS', 'LOGICAL', 'MACRO', 'MAP', 'MATCHED', 'MERGE', 'MSCK', 'NAMESPACE', 'NAMESPACES', 'NATURAL', 'NO', NOT, 'NULL', 'NULLS', 'OF', 'ON', 'ONLY', 'OPTION', 'OPTIONS', 'OR', 'ORDER', 'OUT', 'OUTER', 'OUTPUTFORMAT', 'OVER', 'OVERLAPS', 'OVERLAY', 'OVERWRITE', 'PARTITION', 'PARTITIONED', 'PARTITIONS', 'PERCENT', 'PIVOT', 'PLACING', 'POSITION', 'PRECEDING', 'PRIMARY', 'PRINCIPALS', 'PROPERTIES', 'PURGE', 'QUERY', 'RANGE', 'RECORDREADER', 'RECORDWRITER', 'RECOVER', 'REDUCE', 'REFERENCES', 'REFRESH', 'RENAME', 'REPAIR', 'REPLACE', 'RESET', 'RESTRICT', 'REVOKE', 'RIGHT', RLIKE, 'ROLE', 'ROLES', 'ROLLBACK', 'ROLLUP', 'ROW', 'ROWS', 'SCHEMA', 'SELECT', 'SEMI', 'SEPARATED', 'SERDE', 'SERDEPROPERTIES', 'SESSION_USER', 'SET', 'MINUS', 'SETS', 'SHOW', 'SKEWED', 'SOME', 'SORT', 'SORTED', 'START', 'STATISTICS', 'STORED', 'STRATIFY', 'STRUCT', 'SUBSTR', 'SUBSTRING', 'TABLE', 'TABLES', 'TABLESAMPLE', 'TBLPROPERTIES', TEMPORARY, 'TERMINATED', 'THEN', 'TIME', 'TO', 'TOUCH', 'TRAILING', 'TRANSACTION', 'TRANSACTIONS', 'TRANSFORM', 'TRIM', 'TRUE', 'TRUNCATE', 'TYPE', 'UNARCHIVE', 'UNBOUNDED', 'UNCACHE', 'UNION', 'UNIQUE', 'UNKNOWN', 'UNLOCK', 'UNSET', 'UPDATE', 'USE', 'USER', 'USING', 'VALUES', 'VIEW', 'VIEWS', 'WHEN', 'WHERE', 'WINDOW', 'WITH', 'ZONE', IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 6)

标签: python-3.xpysparkapache-spark-sql

解决方案


推荐阅读