首页 > 解决方案 > pyspark 数据框中的文件问题

问题描述

我一直在研究 pyspark 工具上的一个工具,该工具基于搜索过滤然后对这些结果进行排序。数据框是 1,400 多个 csv 的汇编。当我尝试运行代码时,我收到一条很长的错误消息。对于意外的 EOF,它似乎分解为 java 错误:

Py4JJavaError: An error occurred while calling o1331.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 202 in stage 56.0 failed 4 times, most recent failure: Lost task 202.3 in stage 56.0 (TID 7632, emr-master-f35-eels.sss.local, executor 31): com.univocity.parsers.common.TextParsingException: java.lang.IllegalStateException - Error reading from input
Parser Configuration: CsvParserSettings:
         Auto configuration enabled=true
         Autodetect column delimiter=false
         Autodetect quotes=false
         Column reordering enabled=true
         Delimiters for detection=null
         Empty value=
         Escape unquoted values=false
         Header extraction enabled=null
         Headers=null
         Ignore leading whitespaces=false
         Ignore leading whitespaces in quotes=false
         Ignore trailing whitespaces=false
         Ignore trailing whitespaces in quotes=false
         Input buffer size=128
         Input reading on separate thread=false
         Keep escape sequences=false
         Keep quotes=false
         Length of content displayed on error=-1
         Line separator detection enabled=false
         Maximum number of characters per column=-1
         Maximum number of columns=20480
         Normalize escaped line separators=true
         Null value=
         Number of records to read=all
         Processor=none
         Restricting data in exceptions=false
         RowProcessor error handler=null
         Selected fields=field selection: [2]
         Skip bits as whitespace=true
         Skip empty lines=true
         Unescaped quote handling=STOP_AT_DELIMITERFormat configuration:
         CsvFormat:
                 Comment character=\0
                 Field delimiter=,
                 Line separator (normalized)=\n
                 Line separator sequence=\n
                 Quote character="
                 Quote escape character="
                 Quote escape escape character=null
Internal state when error was thrown: line=737459, column=3, record=235, charIndex=457399297, headers=[attachment_md5_checksum, attachment_filename, attachment_text, attachment_urlsafe_base64_bytes, notice_id, title, solicitation_number, department_ind_agency, cgac, sub_tier, fpds_code, office, aac_code, posted_date, type, base_type, archive_type, archive_date, set_aside_code, set_aside, response_deadline, naice_code, classification_code, pop_street_address, pop_city, pop_state, pop_zip, pop_country, active, award_number, award_date, award_dollars, awardee, primary_contact_title, primary_contact_full_name, primary_contact_email, primary_contact_phone, primary_contact_fax, secondary_contact_title, secondary_contact_full_name, secondary_contact_email, secondary_contact_phone, secondary_contact_fax, organization_type, state, city, zip_code, country_code, additional_info_link, link, description]
         at com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:369)
         at com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:595)
         at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anon$1.next(UnivocityParser.scala:330)
         at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
         at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:464)
         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
         at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:130)
         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
         at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
         at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585)
         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
         at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
         at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
         at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
         at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
         at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
         at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1990)
         at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
Caused by: java.lang.IllegalStateException: Error reading from input
         at com.univocity.parsers.common.input.DefaultCharInputReader.reloadBuffer(DefaultCharInputReader.java:80)
         at com.univocity.parsers.common.input.AbstractCharInputReader.updateBuffer(AbstractCharInputReader.java:192)
         at com.univocity.parsers.common.input.AbstractCharInputReader.nextChar(AbstractCharInputReader.java:269)
         at com.univocity.parsers.common.input.NoopCharAppender.appendUntil(NoopCharAppender.java:170)
         at com.univocity.parsers.csv.CsvParser.parseRecord(CsvParser.java:186)
         at com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:560)
         ... 18 more
Caused by: java.io.EOFException: Unexpected end of input stream
         at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:165)
         at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
         at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
         at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
         at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
         at java.io.InputStreamReader.read(InputStreamReader.java:184)
         at com.univocity.parsers.common.input.DefaultCharInputReader.reloadBuffer(DefaultCharInputReader.java:78)
         ... 23 more

Driver stacktrace:
         at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2043)
         at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2031)
         at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2030)
         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
         at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2030)
         at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
         at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
         at scala.Option.foreach(Option.scala:257)
         at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:967)
         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2264)
         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2213)
         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2202)
         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
         at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:778)
         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
         at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1080)
         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
         at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
         at org.apache.spark.rdd.RDD.reduce(RDD.scala:1062)
         at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1.apply(RDD.scala:1484)
         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
         at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
         at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1471)
         at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:136)
         at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3267)
         at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3264)
         at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
         at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery$1(SQLExecution.scala:83)
         at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:94)
         at org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141)
         at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178)
         at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:93)
         at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200)
         at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92)
         at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
         at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3264)
         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke(Method.java:498)
         at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
         at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
         at py4j.Gateway.invoke(Gateway.java:282)
         at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
         at py4j.commands.CallCommand.execute(CallCommand.java:79)
         at py4j.GatewayConnection.run(GatewayConnection.java:238)
         at java.lang.Thread.run(Thread.java:748)
Caused by: com.univocity.parsers.common.TextParsingException: java.lang.IllegalStateException - Error reading from input
Parser Configuration: CsvParserSettings:
         Auto configuration enabled=true
         Autodetect column delimiter=false
         Autodetect quotes=false
         Column reordering enabled=true
         Delimiters for detection=null
         Empty value=
         Escape unquoted values=false
         Header extraction enabled=null
         Headers=null
         Ignore leading whitespaces=false
         Ignore leading whitespaces in quotes=false
         Ignore trailing whitespaces=false
         Ignore trailing whitespaces in quotes=false
         Input buffer size=128
         Input reading on separate thread=false
         Keep escape sequences=false
         Keep quotes=false
         Length of content displayed on error=-1
         Line separator detection enabled=false
         Maximum number of characters per column=-1
         Maximum number of columns=20480
         Normalize escaped line separators=true
         Null value=
         Number of records to read=all
         Processor=none
         Restricting data in exceptions=false
         RowProcessor error handler=null
         Selected fields=field selection: [2]
         Skip bits as whitespace=true
         Skip empty lines=true
         Unescaped quote handling=STOP_AT_DELIMITERFormat configuration:
         CsvFormat:
                  Comment character=\0
                 Field delimiter=,
                 Line separator (normalized)=\n
                 Line separator sequence=\n
                 Quote character="
                 Quote escape character="
                 Quote escape escape character=null
Internal state when error was thrown: line=737459, column=3, record=235, charIndex=457399297, headers=[attachment_md5_checksum, attachment_filename, attachment_text, attachment_urlsafe_base64_bytes, notice_id, title, solicitation_number, department_ind_agency, cgac, sub_tier, fpds_code, office, aac_code, posted_date, type, base_type, archive_type, archive_date, set_aside_code, set_aside, response_deadline, naice_code, classification_code, pop_street_address, pop_city, pop_state, pop_zip, pop_country, active, award_number, award_date, award_dollars, awardee, primary_contact_title, primary_contact_full_name, primary_contact_email, primary_contact_phone, primary_contact_fax, secondary_contact_title, secondary_contact_full_name, secondary_contact_email, secondary_contact_phone, secondary_contact_fax, organization_type, state, city, zip_code, country_code, additional_info_link, link, description]
         at com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:369)
         at com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:595)
         at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anon$1.next(UnivocityParser.scala:330)
         at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
         at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:464)
         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
         at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:130)
         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
         at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
         at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585)
         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
         at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
         at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
         at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
         at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
         at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
         at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1990)
         at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
Caused by: java.lang.IllegalStateException: Error reading from input
         at com.univocity.parsers.common.input.DefaultCharInputReader.reloadBuffer(DefaultCharInputReader.java:80)
         at com.univocity.parsers.common.input.AbstractCharInputReader.updateBuffer(AbstractCharInputReader.java:192)
         at com.univocity.parsers.common.input.AbstractCharInputReader.nextChar(AbstractCharInputReader.java:269)
         at com.univocity.parsers.common.input.NoopCharAppender.appendUntil(NoopCharAppender.java:170)
         at com.univocity.parsers.csv.CsvParser.parseRecord(CsvParser.java:186)
         at com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:560)
         ... 18 more
Caused by: java.io.EOFException: Unexpected end of input stream
         at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:165)
         at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
         at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
         at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
         at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
         at java.io.InputStreamReader.read(InputStreamReader.java:184)
         at com.univocity.parsers.common.input.DefaultCharInputReader.reloadBuffer(DefaultCharInputReader.java:78)
         ... 23 more

(<class 'py4j.protocol.Py4JJavaError'>, Py4JJavaError('An error occurred while calling o1331.collectToPython.\n', JavaObject id=o1338), <traceback object at 0x7ff3f2ff9230>)

我经历了在每个 csv 上单独运行此代码的过程,并将其缩小到导致此错误的 6 个。显然,我可以从列表中删除这 6 个文件,然后代码可以正常运行,但如果有办法使用代码来诊断并可能修复这些文件,我想先尝试这条路线。关于如何解决这个问题的任何建议/想法?

编辑

根据下面的建议,我尝试使用以下代码打开文件,然后打印内容:

with open('file.csv.bz2', 'r', encoding='ISO-8859-1') as f:
    lines = f.readlines()
    print(lines)

这可以毫无问题地运行。但是,然后我尝试在 pandas 中打开它并收到 EOFError。

标签: pyspark

解决方案


您可以加载文件,同时保留所有记录:

  1. 您可以将没有这 6 个文件的所有记录加载到单个数据框中。

  2. 使用 #1 中的模式加载 6 个文件,同时使用PERMISSIVE模式(参见示例)并保留格式错误的列。

  3. 或者,您可以重命名当前列名。请参阅使用 PERMISSIVE 模式无法在 pyspark 中保留损坏的行

  4. 现在您可以查看格式错误的内容,并决定是否删除这些内容,将它们写入交易信队列,将您决定的任何内容记录下来。


推荐阅读