csv - 使用 Spark 内置函数或方法在 Pyspark 中解析 csv 文件
问题描述
我正在使用 spark 2.3 版并在一些 poc 上工作,我必须加载一些 csv 文件来触发数据帧。
考虑下面的 csv 作为我需要解析并将其加载到数据框中的示例。给定的 csv 有多个需要识别的不良记录。
id,name,age,loaded_date,sex
1,ABC,32,2019-09-11,M
2,,33,2019-09-11,M
3,XYZ,35,2019-08-11,M
4,PQR,32,2019-30-10,M #invalid date
5,EFG,32, #missing other column details
6,DEF,32,2019/09/11,M #invalid date format
7,XYZ,32,2017-01-01,9 #last column has to be character only
8,KLM,XX,2017-01-01,F
9,ABC,3.2,2019-10-10,M #decimal value for integer data type
10,ABC,32,2019-02-29,M #invalid date
如果我必须使用 python 或 pandas 函数解析它,这将是一件容易的事。
这就是我为此定义架构的方式。
from pyspark.sql.types import*
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("loaded_date", DateType(), True),
StructField("sex", StringType(), True),
StructField("corrupt_record",StringType(), True)])
df=spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("dateFormat", "yyyy-MM-dd") \
.option("nanValue","0") \
.option("nullValue"," ") \
.option("treatEmptyValuesAsNulls","false") \
.option("columnNameOfCorruptRecord", "corrupt_record") \
.schema(schema).load(file)
>>> df.show(truncate=False)
+----+----+----+-----------+----+----------------------+
|id |name|age |loaded_date|sex |corrupt_record |
+----+----+----+-----------+----+----------------------+
|1 |ABC |32 |2019-09-11 |M |null |
|2 |null|33 |2019-09-11 |M |null |
|3 |XYZ |35 |2019-08-11 |M |null |
|4 |PQR |32 |2021-06-10 |M |null |
|5 |EFG |32 |null |null|5,EFG,32, |
|null|null|null|null |null|6,DEF,32,2019/09/11,M |
|7 |XYZ |32 |2017-01-01 |9 |null |
|null|null|null|null |null|8,KLM,XX,2017-01-01,F |
|null|null|null|null |null|9,ABC,3.2,2019-10-10,M|
|10 |ABC |32 |2019-03-01 |M |null |
+----+----+----+-----------+----+----------------------+
上面的代码按预期解析了许多记录,但未能检查无效日期。见记录'4'
& '10'
。它已转换为一些垃圾日期。
我可以将日期加载为字符串类型并创建一些 udf 或使用 cast 来正确解析它并查看输入的日期是否有效。有什么方法可以在不使用自定义 udf 或代码中的更高版本的情况下首先检查无效日期。
另外,我正在寻找一种方法来处理'7'
最后一列具有数值的记录。
解决方案
根据OP 的要求,我在此处记下答案PySpark
-
首先,只需加载没有任何预先指定模式的数据,同样由@AndrzejS 完成
df = spark.read.option("header", "true").csv("data/yourdata.csv")
df.show()
+---+----+---+-----------+----+
| id|name|age|loaded_date| sex|
+---+----+---+-----------+----+
| 1| ABC| 32| 2019-09-11| M|
| 2|null| 33| 2019-09-11| M|
| 3| XYZ| 35| 2019-08-11| M|
| 4| PQR| 32| 2019-30-10| M|
| 5| EFG| 32| null|null|
| 6| DEF| 32| 2019/09/11| M|
| 7| XYZ| 32| 2017-01-01| 9|
| 8| KLM| XX| 2017-01-01| F|
| 9| ABC|3.2| 2019-10-10| M|
| 10| ABC| 32| 2019-02-29| M|
+---+----+---+-----------+----+
然后,我们需要确定哪些值不适合列方案。例如;XX
或32
不能是age
,因此这些值需要标记为Null
。我们做一个测试,如果这个值是一个Integer
或否则。类似地,我们进行测试 ifloaded_date
是否确实是 a date
,最后我们检查 the sex
is either F/M
。请参阅我之前关于这些测试的帖子。
df = df.select('id','name',
'age', (col('age').cast('int').isNotNull() & (col('age').cast('int') - col('age') == 0)).alias('ageInt'),
'loaded_date',(col('loaded_date').cast('date').isNotNull()).alias('loaded_dateDate'),
'sex'
)
df.show()
+---+----+---+------+-----------+---------------+----+
| id|name|age|ageInt|loaded_date|loaded_dateDate| sex|
+---+----+---+------+-----------+---------------+----+
| 1| ABC| 32| true| 2019-09-11| true| M|
| 2|null| 33| true| 2019-09-11| true| M|
| 3| XYZ| 35| true| 2019-08-11| true| M|
| 4| PQR| 32| true| 2019-30-10| false| M|
| 5| EFG| 32| true| null| false|null|
| 6| DEF| 32| true| 2019/09/11| false| M|
| 7| XYZ| 32| true| 2017-01-01| true| 9|
| 8| KLM| XX| false| 2017-01-01| true| F|
| 9| ABC|3.2| false| 2019-10-10| true| M|
| 10| ABC| 32| true| 2019-02-29| false| M|
+---+----+---+------+-----------+---------------+----+
最后,使用if/else
pyspark 就是when/otherwise
将不相关的值标记为Null
。
df = df.withColumn('age',when(col('ageInt')==True,col('age')).otherwise(None))\
.withColumn('loaded_date',when(col('loaded_dateDate')==True,col('loaded_date')).otherwise(None))\
.withColumn('sex',when(col('sex').isin('M','F'),col('sex')).otherwise(None))\
.drop('ageInt','loaded_dateDate')
df.show()
+---+----+----+-----------+----+
| id|name| age|loaded_date| sex|
+---+----+----+-----------+----+
| 1| ABC| 32| 2019-09-11| M|
| 2|null| 33| 2019-09-11| M|
| 3| XYZ| 35| 2019-08-11| M|
| 4| PQR| 32| null| M|
| 5| EFG| 32| null|null|
| 6| DEF| 32| null| M|
| 7| XYZ| 32| 2017-01-01|null|
| 8| KLM|null| 2017-01-01| F|
| 9| ABC|null| 2019-10-10| M|
| 10| ABC| 32| null| M|
+---+----+----+-----------+----+
推荐阅读
- amazon-cloudformation - Unexpected close tag in aws cdk deploy
- c++ - 成员函数有没有办法知道对象是右值还是左值?
- python - Python-pptx:改变堆积条形图的颜色
- python - TypeError: ufunc 循环不支持 csr_matrix 类型的参数 0,它没有可调用的 exp 方法
- palantir-foundry - 如何从铸造融合表中删除重复的行
- arrays - 将列表框的选择存储在数组 VBA 中
- installshield - InstallShield 2011:缺少预定义变量
- python - 如何获得函数 Discord.PY 的结果?
- c++ - C++/CUDA:未解析的外部符号“enum cudaError ...”
- python - 'int' 对象不是可下标的错误,即使对象是字符串