python - 无法使用 PERMISSIVE 模式在 pyspark 中保留损坏的行
问题描述
我得到了一个 csv 文件,我需要在该文件上使用 pyspark 执行某些清理任务。在清理之前,我正在做一些模式验证检查。下面是我的代码。
# schema for the input data
def get_input_schema():
return StructType([StructField("Group ID", StringType(), True),
StructField("Start Date", DateType(), True),
StructField("Start Time", StringType(), True),
...
StructField("malformed_rows", StringType(), True)
])
# basic cleanup logic
def main(argv):
spark = SparkSession.builder.appName('cleaner_job').getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
df = spark.read.option("mode", "PERMISSIVE") \
.option("dateFormat", "yyyy-MM-dd") \
.option("columnNameOfCorruptRecord", "malformed_rows") \
.schema(get_input_schema()) \
.csv(input_path, header=True)
# this is where the error is happening
df_bad = df.filter(df["malformed_rows"].isNotNull())
df_good = df.filter(df["malformed_rows"].isNull())
df_good.write.csv(output_path, header=True)
df_bad.write.csv(output_malformed_path, header=True)
我在读取 csv 并尝试根据列是否为空PERMISSIVE
将输入数据帧拆分为两个数据帧(df_good
和df_bad
)时使用该模式。malformed_rows
如果我不拆分数据框并将其直接写入 csv,我可以malformed_rows
在输出 csv 中看到该列。但是上面的代码抛出错误说:
ERROR Utils: Aborting task
java.lang.IllegalArgumentException: malformed_rows does not exist. Available: Group ID, Start Date, Start Time, ...,
at org.apache.spark.sql.types.StructType.$anonfun$fieldIndex$1(StructType.scala:306)
at scala.collection.MapLike.getOrElse(MapLike.scala:131)
at scala.collection.MapLike.getOrElse$(MapLike.scala:129)
at scala.collection.AbstractMap.getOrElse(Map.scala:63)
at org.apache.spark.sql.types.StructType.fieldIndex(StructType.scala:305)
at org.apache.spark.sql.catalyst.csv.CSVFilters.$anonfun$predicates$4(CSVFilters.scala:65)
at org.apache.spark.sql.catalyst.csv.CSVFilters.$anonfun$predicates$4$adapted(CSVFilters.scala:65)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at org.apache.spark.sql.catalyst.csv.CSVFilters.$anonfun$predicates$3(CSVFilters.scala:65)
at org.apache.spark.sql.catalyst.csv.CSVFilters.$anonfun$predicates$3$adapted(CSVFilters.scala:54)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.csv.CSVFilters.<init>(CSVFilters.scala:54)
at org.apache.spark.sql.catalyst.csv.UnivocityParser.<init>(UnivocityParser.scala:101)
at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.$anonfun$buildReader$1(CSVFileFormat.scala:138)
at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:147)
at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:132)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:116)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:169)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:272)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
ERROR FileFormatWriter: Job job_20210302150943_0000 aborted.
ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:291)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
我已经浏览了 spark doc,它说要保留我们需要在模式中定义它的损坏数据列,我正在这样做。但这让我感到困惑,为什么只有在我尝试过滤数据时它才不起作用。非常感谢解决此问题的任何帮助。
解决方案
malformed_rows
是默认命名的内部损坏记录列,_corrupt_record
您重命名为:
.option("columnNameOfCorruptRecord", "malformed_rows")
但是从 Spark 2.3 开始,您不能仅使用文档中引用的此列来查询数据,您需要先缓存 df:
_corrupt_record
从 Spark 2.3 开始,当引用的列仅包含内部损坏记录列(默认命名)时,不允许来自原始 JSON/CSV 文件的查询。例如,spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()
和spark.read.schema(schema).json(file).select("_corrupt_record").show()
。相反,您可以缓存或保存解析的结果,然后发送相同的查询。例如,val df = spark.read.schema(schema).json(file).cache()
然后df.filter($"_corrupt_record".isNotNull).count()
。
推荐阅读
- javascript - 如何通过此代码使用文本表单而不是可编辑的 div
- java - 使用带有现有 pfx 文件的 Apache PDF Box 在 PDF 文件中添加数字签名
- android - 库模块在我的项目应用程序模块android中不起作用
- android - Raspberry Pi 作为 MQTT 代理以及订阅者或发布者
- python - 有没有更有效的方法来枚举python或R中离散随机变量的每个可能结果的概率?
- c# - Process.Start Notepad++ 一个由 Windows 服务启动的应用程序
- scala - MQTTUtils.createPairedStream() 不是 org.apache.bahir 的成员
- javascript - 在 inputValidator sweetalert2 中调用 AJAX 请求
- php - 针对 1000000+ 行优化此查询
- html - 如何在 CSS 中将选择器与 div 一起使用