python - “内容长度分隔的消息正文过早结束” - 使用 pyspark 从 S3 存储桶访问 json 文件时出错
问题描述
我正在尝试从 s3 存储桶加载一个大的(大于我的系统内存)json 来触发 Dataframe。但低于错误。
org.apache.http.ConnectionClosedException:内容长度分隔的消息正文过早结束(预期:138345560;收到:67924532
- 我正在使用 spark 2.4.7 和 Hadoop 2.7 发行版,以及 python 3.7。
- 我已经复制了“hadoop-aws:2.7.3.jar”和“aws-sdk:1.7.4.jar”
- Sparkhome/jars/ 目录。已经使用 JDK 1.8(设置 SPARK_HOME、JAVA_HOME、HADOOP_HOME)环境变量。
- 以前有 python 3.8、java 15、spark 3.0 和 hadoop 2.7 - 因为这个错误而降级。
# pyspark_job.py
from pyspark.sql import SparkSession
def create_spark_session():
"""
Create spark session.
Returns:
spark (SparkSession) - spark session
"""
spark = SparkSession \
.builder \
.master("local[4]") \
.appName("Myappname") \
.getOrCreate()
return spark
def setConfigs(spark):
"""
setting the environment configuration properties for s3 access
arguments:
spark (SparkSession) - spark session
"""
ak = 'accesskey' # as of now I am using the keys directly, once i get the basic functionality working, the keys will be taken from property file/env variable
sk = 'secretkey'
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", ak)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", sk)
sc._jsc.hadoopConfiguration().set("fs.s3a.readahead.range", '512M')
def process_events_data(spark, input_path):
"""Process the event data from s3 bucket as json files and get specific info.
Arguments:
spark (SparkSession) - spark session connected to local cluster
input_path (str) - AWS S3 bucket path for source data
returns
Data Frame with event data
"""
df = spark.read.format('json').load(input_path)
#df = spark.read.json("input_path")
return df
def main():
bn = 'bucketname'
sf = 'prefix/'
fp = bn+sf
input_path = 's3a://bucketname/prefix/filename.json'
spark = create_spark_session()
setConfigs(spark)
edf = process_events_data(spark, input_path)
if __name__ == '__main__':
main()
错误跟踪:
20/09/29 22:32:35 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, localhost, executor driver): TaskKilled (Stage cancelled) Traceback (last recent call last): File "pyspark_job.py", line 61,在 main() 文件“pyspark_job.py”,第 58 行,在 main process_events_data(spark, input_path) 文件“pyspark_job.py”,第 41 行,在 process_events_data df = spark.read.format('json').load (输入路径)文件“spark_dir\spark-2.4.7_h2.7\python\lib\pyspark.zip\pyspark\sql\readwriter.py”,第 166 行,在加载文件“spark_dir\spark-2.4.7_h2.7\python \lib\py4j-0.10.7-src.zip\py4j\java_gateway.py",第 1257 行,调用中 文件“spark_dir\spark-2.4.7_h2.7\python\lib\pyspark.zip\pyspark\sql\utils.py”,第 63 行,在 deco 文件“spark_dir\spark-2.4.7_h2.7\python\lib\ py4j-0.10.7-src.zip\py4j\protocol.py",第 328 行,在 get_return_value py4j.protocol.Py4JJavaError:调用 o42.load 时出错。:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 0.0 中的任务 0 失败 1 次,最近一次失败:阶段 0.0 中丢失任务 0.0(TID 0,本地主机,执行程序驱动程序):org.apache.http .ConnectionClosedException:内容长度分隔的消息正文过早结束(预期:138345560;收到:71284276
已经尝试过以下解决方案,但仍然出现错误: Premature end of Content-Length delimited message body SparkException while reading from S3 using Pyspark
我不确定是否缺少某些配置?还是 S3 方面有问题?还是我需要逐行读取 json 输入?这是我的第一个火花程序,任何人都可以帮忙。
解决方案
推荐阅读
- spring - 如何配置我的 logback 系统,以便它可以在基于时间的滚动周期完成后立即创建日志文件?
- smartsheet-api - 如何检索用户管理报告?
- sql - 如何合并hive中的行?
- python - Python:将键的字典:列表转换为字典:键/值对
- django - 将本地数据库与 Heroku 中部署的项目同步
- linux - Pluggable authentication module是现代linux/unix的内置模块吗
- c - 将多个语句连接在一起仅执行第一个命令
- php - dropzone.js,无法以编程方式创建 dropzone
- gcc - GCC 编译错误:配置:错误:C 编译器无法创建可执行文件
- php - 使用准备好的语句 mysqli 获取查询结果