apache-spark - Spark s3数据框选择失败:ConnectionClosedException内容长度过早结束
问题描述
我对 Spark 和整个生态系统都非常陌生,所以这个错误对于发起者来说是愚蠢的,但我没有发现任何支持或类似的问题被发布。
我在 S3 存储桶上有大量数据 (TB),这些数据被拆分为子目录中的数千个 100Mb 镶木地板文件。目前,我只想查询一个文件并选择一些行。我在学习时使用 PySpark 在本地运行 spark (3.0):
代码如下所示:
spark = SparkSession.builder \
.master("local") \
.appName("Test") \
.getOrCreate()
path = "s3a://BUCKET_NAME/DIR/FILE.gz.parquet"
df = spark.read.parquet(path)
df.printSchema() # this works
df.show(n=10) # this works
df.orderBy("sessionID").show(n=5) # this works
df.select("sessionID").show(n=5) # this fails
OrderBy 工作正常且快速显示按名称排序的前 5 个。但是,选择查询失败并显示:
19/09/13 01:16:28 ERROR TaskContextImpl: Error in TaskCompletionListener
org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 74915373; received: 45265606
at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
所以我相信选择操作没有从 S3 存储桶接收完整数据,但是我该怎么办呢?为什么 OrderBy 有效?
以下问题有点开放式。数据被组织成需要一次性处理的会话。但是每个会话的行分散在每个 parquet 文件和数百个 parquet 文件中,这意味着必须遍历数百 GB 才能拼凑一个完整的会话。所以我希望 Spark 按会话 ID 排序。该处理将由一个单独的 C++ 库完成,因此我必须将会话数据通过管道输出。在本地机器上处理整个数据集将是棘手的,但我可以应用一些选择将数据减少到 50Gb 之类的东西,我希望可以在几个小时内在功能强大的工作站(32 核、64 Gb)上处理这些数据。这可行吗?设计看起来如何?抱歉,这含糊不清,但 Spark 示例要么在一个很小的 JSON 上非常简单,要么假设有非常深入的知识,并且很难从前者过渡到后者。
解决方案
在花费数小时浏览不同的配置选项并且基本上没有从我开始的地方开始之后。结果系统管理员安装了最新的 Spark 3.0,但肯定有问题。
我安装了 spark 2.4.4,确保选择了 java 8 Pyspark 错误 - Unsupported class file major version 55
一切都按预期工作
推荐阅读
- c# - 用于询问任何类型问题的通用类设计?
- html - 带有表单 POST 按钮的 Bootstrap 4 卡片 - 无法与卡片底部对齐
- node.js - 如何在sheetjs中读取特定的行和行
- javascript - 无法输入
场地 - c++ - 检查二叉树是否为 bst
- transactions - 以太坊交易因为“还原”而“失败”。“警告!合约执行过程中遇到错误[已恢复]”
- npm - 终端上的 Gulp 安装错误
- python - 尝试运行 manage.py migrate 时出错
- compression - 如何找到两个 s 表达式之间的编辑距离?
- python - 如何使滚动条在 Tkinter 框架中工作?