python-3.x - 访问自定义架构文件并将架构强加到 pyspark 数据框时出错
问题描述
我在云存储中定义了一个模式。在 pyspark 数据框中读取数据时,我想强加相同的模式。
Schema文件格式为schema.txt(放在云桶中)
[StructField('_corrupt_record', StringType(), True),StructField('id', StructType([ StructField('_data', StringType(), True) ])), StructField('clusterTime', StructType([ StructField('_timestamp' , StructType([ StructField('i',IntegerType(), True)]))])),StructField('fullDocument', StructType([StructField('seg' , StructType([ StructField('ap_lan',StringType(), True),StructField('mcd',IntegerType(), True),StructField('dd',StringType(), True), StructField('fgr',StringType(), True),StructField('na',StringType(), True)]))])),StructField('nh', StringType(), True)]
代码如下:
import json
import ast
from google.cloud import storage
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructField, StringType, StructType, IntegerType, FloatType,LongType,BooleanType)
spark_session = SparkSession.builder.appName("Test").getOrCreate()
spark_session.conf.set('temporaryGcsBucket', bucket)
def get_configs():
print("Inside get configs ")
with open("config.json") as json_data_file:
data = json.load(json_data_file)
input_schema = get_schema(data)
return input_schema
def get_schema(data):
"""This function loads the given schema of all the events from cloud bucket available"""
client = storage.Client()
project_name = data["env_args"]["project"]
bucket_name = data["env_args"]["bucket_name"]
file_name = data["env_args"]["file_name"]
bucket = client.get_bucket(bucket_name)
blob = bucket.get_blob(file_name)
schema_blob = blob.download_as_string()
return schema_blob
#The call goes to config file which informs which bucket and file is to be read.
schema = get_configs() ```It returns datatype as bytes so I convert it to String using decode('utf-8') ```
change_struct = schema.decode('utf-8')
final_struct = StructType(fields=change_struct)
df = spark_session.read.option("multiline","true")\
.option("mode", "PERMISSIVE")\
.option("primitivesAsString", "false")\
.schema(final_struct)\
.option("columnNameOfCorruptRecord", "_corrupt_record")\
.option("allowFieldAddition","true")\
.json(x)
df.show(truncate=True)
错误信息:
final_struct = StructType(fields=final_struct1)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 492, in __init__
self.names = [f.name for f in fields]
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 492, in <listcomp>
self.names = [f.name for f in fields]
AttributeError: 'str' object has no attribute 'name'
我尝试过的事情:
当模式在代码本身中定义时,它工作正常。
import json
import ast
from google.cloud import storage
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructField, StringType, StructType, IntegerType, FloatType,LongType,BooleanType)
spark_session = SparkSession.builder.appName("Test").getOrCreate()
spark_session.conf.set('temporaryGcsBucket', bucket)
final_struct = StructType([StructField('_corrupt_record', StringType(), True),StructField('id', StructType([ StructField('_data', StringType(), True) ])), StructField('clusterTime', StructType([ StructField('_timestamp' , StructType([ StructField('i',IntegerType(), True)]))])),StructField('fullDocument', StructType([StructField('seg' , StructType([ StructField('ap_lan',StringType(), True),StructField('mcd',IntegerType(), True),StructField('dd',StringType(), True), StructField('fgr',StringType(), True),StructField('na',StringType(), True)]))])),StructField('nh', StringType(), True)])
df = spark_session.read.option("multiline","true")\
.option("mode", "PERMISSIVE")\
.option("primitivesAsString", "false")\
.schema(final_struct)\
.option("columnNameOfCorruptRecord", "_corrupt_record")\
.option("allowFieldAddition","true")\
.json(x)
请帮助我了解这里出了什么问题。
更新的架构文件:将架构序列化为 json 格式,如下所示:
{"type": "struct", "fields": [{"name": "_corrupt_record", "type": "string", "nullable": true, "metadata": {}}, {"name": "id", "type": {"type": "struct", "fields": [{"name": "_data", "type": "string", "nullable": true, "metadata": {}}]}, "nullable": true, "metadata": {}}, {"name": "clusterTime", "type": {"type": "struct", "fields": [{"name": "_timestamp", "type": {"type": "struct", "fields": [{"name": "i", "type": "integer", "nullable": true, "metadata": {}}]}, "nullable": true, "metadata": {}}]}, "nullable": true, "metadata": {}}, {"name": "fullDocument", "type": {"type": "struct", "fields": [{"name": "sg", "type": {"type": "struct", "fields": [{"name": "col1", "type": "string", "nullable": true, "metadata": {}}, {"name": "col2", "type": "integer", "nullable": true, "metadata": {}}, {"name": "col3", "type": "string", "nullable": true, "metadata": {}}, {"name": "col4", "type": "string", "nullable": true, "metadata": {}}, {"name": "col5", "type": "string", "nullable": true, "metadata": {}}]}, "nullable": true, "metadata": {}}]}, "nullable": true, "metadata": {}}, {"name": "operationType", "type": "string", "nullable": true, "metadata": {}}]}
现在尝试使用以下代码从 json 文件格式读取自定义架构。
import json
import ast
from google.cloud import storage
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructField, StringType, StructType, IntegerType, FloatType,LongType,BooleanType)
spark_session = SparkSession.builder.appName("Test").getOrCreate()
spark_session.conf.set('temporaryGcsBucket', bucket)
with open("/home/path/schema.json") as f:
new_schema = StructType.fromJson(json.load(f))
final_schema = new_schema.simpleString()
print(final_schema)
print('\n\n')
df = spark_session.read.option("multiline","true")\
.option("mode", "PERMISSIVE")\
.option("primitivesAsString", "false")\
.schema(final_schema)\
.option("columnNameOfCorruptRecord", "_corrupt_record")\
.option("allowFieldAddition","true")\
.json(x)
final_schema 打印为:
struct<_corrupt_record:string,id:struct<_data:string>,clusterTime:struct<_timestamp:struct<i:int>>,fullDocument:struct<sg:struct<col1:string,col2:int,col3:string,col4:string,col5:string>>,operationType:string>
但是在数据帧读取中传递相同的内容时,它会给出错误
File "/home/tes/test_driver.py", line 58, in dstream_to_rdd
.schema(final_schema)\
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 103, in schema
self._jreader = self._jreader.schema(schema)
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
raise converted from None
pyspark.sql.utils.ParseException:
extraneous input '<' expecting {'ADD', 'AFTER', 'ALL', 'ALTER', 'ANALYZE', 'AND', 'ANTI', 'ANY', 'ARCHIVE', 'ARRAY', 'AS', 'ASC', 'AT', 'AUTHORIZATION', 'BETWEEN', 'BOTH', 'BUCKET', 'BUCKETS', 'BY', 'CACHE', 'CASCADE', 'CASE', 'CAST', 'CHANGE', 'CHECK', 'CLEAR', 'CLUSTER', 'CLUSTERED', 'CODEGEN', 'COLLATE', 'COLLECTION', 'COLUMN', 'COLUMNS', 'COMMENT', 'COMMIT', 'COMPACT', 'COMPACTIONS', 'COMPUTE', 'CONCATENATE', 'CONSTRAINT', 'COST', 'CREATE', 'CROSS', 'CUBE', 'CURRENT', 'CURRENT_DATE', 'CURRENT_TIME', 'CURRENT_TIMESTAMP', 'CURRENT_USER', 'DATA', 'DATABASE', DATABASES, 'DBPROPERTIES', 'DEFINED', 'DELETE', 'DELIMITED', 'DESC', 'DESCRIBE', 'DFS', 'DIRECTORIES', 'DIRECTORY', 'DISTINCT', 'DISTRIBUTE', 'DIV', 'DROP', 'ELSE', 'END', 'ESCAPE', 'ESCAPED', 'EXCEPT', 'EXCHANGE', 'EXISTS', 'EXPLAIN', 'EXPORT', 'EXTENDED', 'EXTERNAL', 'EXTRACT', 'FALSE', 'FETCH', 'FIELDS', 'FILTER', 'FILEFORMAT', 'FIRST', 'FOLLOWING', 'FOR', 'FOREIGN', 'FORMAT', 'FORMATTED', 'FROM', 'FULL', 'FUNCTION', 'FUNCTIONS', 'GLOBAL', 'GRANT', 'GROUP', 'GROUPING', 'HAVING', 'IF', 'IGNORE', 'IMPORT', 'IN', 'INDEX', 'INDEXES', 'INNER', 'INPATH', 'INPUTFORMAT', 'INSERT', 'INTERSECT', 'INTERVAL', 'INTO', 'IS', 'ITEMS', 'JOIN', 'KEYS', 'LAST', 'LATERAL', 'LAZY', 'LEADING', 'LEFT', 'LIKE', 'LIMIT', 'LINES', 'LIST', 'LOAD', 'LOCAL', 'LOCATION', 'LOCK', 'LOCKS', 'LOGICAL', 'MACRO', 'MAP', 'MATCHED', 'MERGE', 'MSCK', 'NAMESPACE', 'NAMESPACES', 'NATURAL', 'NO', NOT, 'NULL', 'NULLS', 'OF', 'ON', 'ONLY', 'OPTION', 'OPTIONS', 'OR', 'ORDER', 'OUT', 'OUTER', 'OUTPUTFORMAT', 'OVER', 'OVERLAPS', 'OVERLAY', 'OVERWRITE', 'PARTITION', 'PARTITIONED', 'PARTITIONS', 'PERCENT', 'PIVOT', 'PLACING', 'POSITION', 'PRECEDING', 'PRIMARY', 'PRINCIPALS', 'PROPERTIES', 'PURGE', 'QUERY', 'RANGE', 'RECORDREADER', 'RECORDWRITER', 'RECOVER', 'REDUCE', 'REFERENCES', 'REFRESH', 'RENAME', 'REPAIR', 'REPLACE', 'RESET', 'RESTRICT', 'REVOKE', 'RIGHT', RLIKE, 'ROLE', 'ROLES', 'ROLLBACK', 'ROLLUP', 'ROW', 'ROWS', 'SCHEMA', 'SELECT', 'SEMI', 'SEPARATED', 'SERDE', 'SERDEPROPERTIES', 'SESSION_USER', 'SET', 'MINUS', 'SETS', 'SHOW', 'SKEWED', 'SOME', 'SORT', 'SORTED', 'START', 'STATISTICS', 'STORED', 'STRATIFY', 'STRUCT', 'SUBSTR', 'SUBSTRING', 'TABLE', 'TABLES', 'TABLESAMPLE', 'TBLPROPERTIES', TEMPORARY, 'TERMINATED', 'THEN', 'TIME', 'TO', 'TOUCH', 'TRAILING', 'TRANSACTION', 'TRANSACTIONS', 'TRANSFORM', 'TRIM', 'TRUE', 'TRUNCATE', 'TYPE', 'UNARCHIVE', 'UNBOUNDED', 'UNCACHE', 'UNION', 'UNIQUE', 'UNKNOWN', 'UNLOCK', 'UNSET', 'UPDATE', 'USE', 'USER', 'USING', 'VALUES', 'VIEW', 'VIEWS', 'WHEN', 'WHERE', 'WINDOW', 'WITH', 'ZONE', IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 6)
解决方案
推荐阅读
- typescript - 带有 typescript 的 React 路由器 v5 上所需的 url 参数,可以是未定义的
- javascript - 如何在Vue中为v-for创建的每个按钮组设置accesskey?
- azure - Azure IoT Hub - 检查端口 5671 是否打开
- php - 如何从 laravel 控制器传递嵌套的 foreach 以在刀片上查看
- apache-kafka - 是否可以启用或禁用特定的 JMX 指标以使用 Jconsole 监控 Kafka 集群
- reactjs - 学习 Redux + react-redux 的一些问题
- php - 我想在 laravel 查询中进行 mysqli 查询。时间计数
- tomcat - 代理 IIS 服务器到 Tomcat 应用程序
- json - 未找到 Windows 类上的 Apache Drill GUI SQuirrel
- pdf - 如何在 acroforms 中获取字体信息?