首页 > 解决方案 > Spark s3数据框选择失败:ConnectionClosedException内容长度过早结束

问题描述

我对 Spark 和整个生态系统都非常陌生,所以这个错误对于发起者来说是愚蠢的,但我没有发现任何支持或类似的问题被发布。

我在 S3 存储桶上有大量数据 (TB),这些数据被拆分为子目录中的数千个 100Mb 镶木地板文件。目前,我只想查询一个文件并选择一些行。我在学习时使用 PySpark 在本地运行 spark (3.0):

代码如下所示:

spark = SparkSession.builder \
    .master("local") \
    .appName("Test") \
    .getOrCreate()

path = "s3a://BUCKET_NAME/DIR/FILE.gz.parquet"
df = spark.read.parquet(path)

df.printSchema()   # this works
df.show(n=10)      # this works 
df.orderBy("sessionID").show(n=5)   # this works
df.select("sessionID").show(n=5)   # this fails

OrderBy 工作正常且快速显示按名称排序的前 5 个。但是,选择查询失败并显示:

19/09/13 01:16:28 ERROR TaskContextImpl: Error in TaskCompletionListener
org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 74915373; received: 45265606
at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)

所以我相信选择操作没有从 S3 存储桶接收完整数据,但是我该怎么办呢?为什么 OrderBy 有效?

以下问题有点开放式。数据被组织成需要一次性处理的会话。但是每个会话的行分散在每个 parquet 文件和数百个 parquet 文件中,这意味着必须遍历数百 GB 才能拼凑一个完整的会话。所以我希望 Spark 按会话 ID 排序。该处理将由一个单独的 C++ 库完成,因此我必须将会话数据通过管道输出。在本地机器上处理整个数据集将是棘手的,但我可以应用一些选择将数据减少到 50Gb 之类的东西,我希望可以在几个小时内在功能强大的工作站(32 核、64 Gb)上处理这些数据。这可行吗?设计看起来如何?抱歉,这含糊不清,但 Spark 示例要么在一个很小的 ​​JSON 上非常简单,要么假设有非常深入的知识,并且很难从前者过渡到后者。

标签: apache-sparkselectamazon-s3pysparkpyspark-sql

解决方案


在花费数小时浏览不同的配置选项并且基本上没有从我开始的地方开始之后。结果系统管理员安装了最新的 Spark 3.0,但肯定有问题。

我安装了 spark 2.4.4,确保选择了 java 8 Pyspark 错误 - Unsupported class file major version 55

一切都按预期工作


推荐阅读