python - 如何在 Spark 中使用用户定义模式创建 DataFrame
问题描述
我想在 Python 中使用指定的模式在 DataFrame 上创建。这是我到目前为止所做的过程。
我有 Sample.parm 文件,我在其中定义了如下模式:Account_type,string,True
我编写了 python 脚本 sample.py 来读取 sample.parm 文件,根据 sample.parm 文件生成模式,然后根据用户定义的模式生成数据框。
d
def schema():
with open('<path>/sample.parm','r') as parm_file:
reader=csv.reader(parm_file,delimiter=",")
filteredSchema = []
for fieldName in reader:
if fieldName[1].lower() == "decimal":
filteredSchema.append([fieldName[0], DecimalType(),fieldName[2]])
elif fieldName[1].lower() == "string":
filteredSchema.append([fieldName[0], StringType(),fieldName[2]])
elif fieldName[1].lower() == "integer":
filteredSchema.append([fieldName[0], IntegerType(),fieldName[2]])
elif fieldName[1].lower() == "date":
filteredSchema.append([fieldName[0], DateType(),fieldName[2]])
elif fieldName[1].lower() == "byte":
filteredSchema.append([fieldName[0], ByteType(),fieldName[2]])
elif fieldName[1].lower() == "boolean":
filteredSchema.append([fieldName[0], BooleanType(),fieldName[2]])
elif fieldName[1].lower() == "short":
filteredSchema.append([fieldName[0], ShortType(),fieldName[2]])
elif fieldName[1].lower() == "long":
filteredSchema.append([fieldName[0], LongType(),fieldName[2]])
elif fieldName[1].lower() == "double":
filteredSchema.append([fieldName[0], DoubleType(),fieldName[2]])
elif fieldName[1].lower() == "float":
filteredSchema.append([fieldName[0], FloatType(),fieldName[2]])
elif fieldName[1].lower() == "timestamp":
filteredSchema.append([fieldName[0], TimestampType(),fieldName[2]])
struct_schema = [StructField(line[0], line[1], line[2]) for line in filteredSchema]
schema=StructTpe(struct_schema)
return schema
def create_dataframe(path):
val=spark.read.schema(schema()).csv(path, sep='\t')
print(val.take(1))
但出现如下错误:pyspark.sql.utils.IllegalArgumentException: u'Failed to convert the JSON string \'{"metadata":{},"name":"account_type","nullable":"True","type":"string"}\' to a field.'
你能请任何人帮我弄清楚吗?感谢你的帮助
解决方案
我认为 JSON 构建不正确 - 元数据为空,缺少“类型”和“字段”。请为您的架构尝试以下 JSON。
{"type":"struct","fields":[{"name":"account_type","type":"string","nullable":true,"metadata":{"name":"account_type","scale":0}}]}
推荐阅读
- mysql - 如何解决mysql代码中的WITH语法错误
- python - Selenium 从跨度元素的 XPath 中返回一个空列表
- apache-spark - 如何从字符串列表创建列列表
- kubernetes - 如何修复 k3s 上 nginx 入口的空外部 IP?
- python - 如何使用 Request 正确发送 post 请求
- r - 带有 Rcpp 的 R Cran 包可在除 Solaris 之外的每个系统上构建
- typescript - 打字稿:错误:未知选项'--compiler'
- python - 在pyplot中绘制附加点?
- python - 使用 Python 对 sql 数据库执行查询
- java - `...ByKey()` 在 Kotlin flatbuffers impl 中返回 null