apache-spark - 使用 S3 数据帧调用 .show() 和 .toPandas() 时引发 org.apache.http.ConnectionClosedException
问题描述
df
我在 AWS S3 上创建了一个带有 Parquet 数据的 PySpark DataFrame 。调用df.count()
有效,但df.show()
或df.toPandas()
失败并出现以下错误:
Py4JJavaError: An error occurred while calling o41.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0
failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 14, 10.20.202.97,
executor driver): org.apache.http.ConnectionClosedException: Premature end of Content-
Length delimited message body (expected: 77,826,675; received: 8,192)
at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:198)
at org.apache.http.impl.io.ContentLengthInputStream.close(ContentLengthInputStream.java:101)
at org.apache.http.conn.BasicManagedEntity.streamClosed(BasicManagedEntity.java:166)
at org.apache.http.conn.EofSensorInputStream.checkClose(EofSensorInputStream.java:228)
at org.apache.http.conn.EofSensorInputStream.close(EofSensorInputStream.java:172)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at com.amazonaws.services.s3.model.S3ObjectInputStream.abort(S3ObjectInputStream.java:90)
at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:72)
at org.apache.hadoop.fs.s3a.S3AInputStream.seek(S3AInputStream.java:115)
at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:62)
at org.apache.parquet.hadoop.util.H1SeekableInputStream.seek(H1SeekableInputStream.java:46)
at org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1157)
at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:805)
at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:127)
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我在下面回答了我自己的问题:Java 更新似乎是造成这个问题的原因。但是任何人都可以在不降级 Java 的情况下解决这个问题吗?
解决方案
这发生在最新的 Java 更新 (1.8.0_272) 之后。恢复到 1.8.0_265 将解决问题。
推荐阅读
- ms-access - 从 Windows 任务计划程序运行 accessDb 2016 Makro
- web - 下载时网站备份的大小增加了 3 倍
- mongodb - Mongoimport 与当前时间
- javascript - 如何使绝对定位的块可拖动/可滚动
- assembly - 以下代码如何制作大内核锁?
- java - 如何在 Android Studio 中的 Activity 和 Fragment 之间传递信息?
- python - 什么是标准架构模式来集中封装 ORM 调用链以避免重复并提高可维护性?
- c# - 使用 Where 子句过滤对象
- sql - 按 TimescaleDB/PostgreSQL 中的突发事件分组
- javascript - Ant Design/antd 日历在模态框内不起作用