首页 > 解决方案 > “内容长度分隔的消息正文过早结束” - 使用 pyspark 从 S3 存储桶访问 json 文件时出错

问题描述

我正在尝试从 s3 存储桶加载一个大的(大于我的系统内存)json 来触发 Dataframe。但低于错误。

org.apache.http.ConnectionClosedException:内容长度分隔的消息正文过早结束(预期:138345560;收到:67924532

  1. 我正在使用 spark 2.4.7 和 Hadoop 2.7 发行版,以及 python 3.7。
  2. 我已经复制了“hadoop-aws:2.7.3.jar”和“aws-sdk:1.7.4.jar”
  3. Sparkhome/jars/ 目录。已经使用 JDK 1.8(设置 SPARK_HOME、JAVA_HOME、HADOOP_HOME)环境变量。
  4. 以前有 python 3.8、java 15、spark 3.0 和 hadoop 2.7 - 因为这个错误而降级。
# pyspark_job.py
from pyspark.sql import SparkSession

def create_spark_session():
    """
    Create spark session.
    Returns:
        spark (SparkSession) - spark session
    """
    spark = SparkSession \
        .builder \
        .master("local[4]") \
        .appName("Myappname") \
        .getOrCreate()
    return spark

def setConfigs(spark):
    """
    setting the environment configuration properties for s3 access
    arguments:
        spark (SparkSession) - spark session
    """
    ak = 'accesskey'    # as of now I am using the keys directly, once i get the basic functionality working, the keys will be taken from property file/env variable
    sk = 'secretkey'
    sc = spark.sparkContext
    sc._jsc.hadoopConfiguration().set("fs.s3a.access.key",  ak)
    sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", sk)
    sc._jsc.hadoopConfiguration().set("fs.s3a.readahead.range", '512M')

def process_events_data(spark, input_path):
    """Process the event data from s3 bucket as json files and get specific info.
    Arguments:
        spark (SparkSession) - spark session connected to local cluster
        input_path (str) - AWS S3 bucket path for source data
    returns
        Data Frame with event data
    """
    df = spark.read.format('json').load(input_path)
    #df = spark.read.json("input_path")
    return df
    

def main():
    bn = 'bucketname'
    sf = 'prefix/'
    fp = bn+sf
    input_path = 's3a://bucketname/prefix/filename.json'
    
    spark = create_spark_session()
    setConfigs(spark)
   
    edf = process_events_data(spark, input_path)

if __name__ == '__main__':
    main()

错误跟踪:

20/09/29 22:32:35 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, localhost, executor driver): TaskKilled (Stage cancelled) Traceback (last recent call last): File "pyspark_job.py", line 61,在 main() 文件“pyspark_job.py”,第 58 行,在 main process_events_data(spark, input_path) 文件“pyspark_job.py”,第 41 行,在 process_events_data df = spark.read.format('json').load (输入路径)文件“spark_dir\spark-2.4.7_h2.7\python\lib\pyspark.zip\pyspark\sql\readwriter.py”,第 166 行,在加载文件“spark_dir\spark-2.4.7_h2.7\python \lib\py4j-0.10.7-src.zip\py4j\java_gateway.py",第 1257 行,调用中 文件“spark_dir\spark-2.4.7_h2.7\python\lib\pyspark.zip\pyspark\sql\utils.py”,第 63 行,在 deco 文件“spark_dir\spark-2.4.7_h2.7\python\lib\ py4j-0.10.7-src.zip\py4j\protocol.py",第 328 行,在 get_return_value py4j.protocol.Py4JJavaError:调用 o42.load 时出错。:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 0.0 中的任务 0 失败 1 次,最近一次失败:阶段 0.0 中丢失任务 0.0(TID 0,本地主机,执行程序驱动程序):org.apache.http .ConnectionClosedException:内容长度分隔的消息正文过早结束(预期:138345560;收到:71284276

已经尝试过以下解决方案,但仍然出现错误: Premature end of Content-Length delimited message body SparkException while reading from S3 using Pyspark

我不确定是否缺少某些配置?还是 S3 方面有问题?还是我需要逐行读取 json 输入?这是我的第一个火花程序,任何人都可以帮忙。

标签: pythonjsonapache-sparkamazon-s3pyspark

解决方案


推荐阅读