首页 > 解决方案 > 使用 pyspark RDD 拆分一个意外的 csv 文件。电子病历。纱线内存异常错误

问题描述

我一直在研究这段代码。下面我列出了我在 EMR 上使用的代码和大部分集群属性。该代码的目的是根据一些基本迭代将一些 csv 文件在某个行号处拆分为两个(我在下面的代码中包含了一个简单的拆分)。

我经常收到这个错误“ Container killed by YARN for exceeding memory limits”并遵循这些设计原则(下面的链接)来解决它,但我只是不知道为什么会出现内存问题。我有超过 22GB 的纱线开销,文件在 MB 到个位数 GB 范围内。

我有时使用 r5a.12xlarges 无济于事。我真的没有在这段代码中看到任何类型的内存泄漏。它似乎也很慢,只能在 16 小时内处理 20GB 输出到 S3。这是并行化此拆分操作的好方法吗?有内存泄漏吗?是什么赋予了?

https://aws.amazon.com/premiumsupport/knowledge-center/emr-spark-yarn-memory-limit/

[
    {
        "Classification": "spark",
        "Properties": {
            "spark.maximizeResourceAllocation": "true"
        }
    },
    {
        "Classification": "spark-defaults",
        "Properties": {
            "spark.yarn.executor.memoryOverheadFactor":".2"
        }
    },
    {
        "Classification": "spark-env",
        "Configurations": [
            {
                "Configurations": [],
                "Properties": {
                    "PYSPARK_PYTHON": "python36"
                },
                "Classification": "export"
            }
        ],
        "Properties": {
        }
    }
]
   def writetxt(txt: Union[List[str], pandas.DataFrame], path: str) -> None:
        s3 = boto3.resource('s3')
        s3path = S3Url(path)
        object = s3.Object(s3path.bucket, s3path.key)
        if isinstance(txt, pandas.DataFrame):
            csv_buffer = StringIO()
            txt.to_csv(csv_buffer)
            object.put(Body=csv_buffer.getvalue())
        else:
            object.put(Body='\n'.join(txt).encode())

    def main(
            x: Iterator[Tuple[str, str]],
            output_files: str
    ) -> None:
        filename, content = x
        filename = os.path.basename(S3Url(filename).key)
        content = content.splitlines()

        # Split the csv file
        columnAttributes, csvData = data[:100], data[100:]

        writetxt(csvData, os.path.join(output_files, 'data.csv', filename))
        writetxt(columnAttributes, os.path.join(output_files, 'attr.csv', filename))


    if __name__ == "__main__":
        parser = argparse.ArgumentParser(description='Split some mishapen csv files.')
        parser.add_argument('input_files', type=str,
                            help='The location of the input files.')
        parser.add_argument('output_files', type=str,
                            help='The location to put the output files.')
        parser.add_argument('--nb_partitions', type=int, default=4)
        args = parser.parse_args()

        # creating the context
        sc = SparkContext(appName="Broadcom Preprocessing")

        # We use minPartitions because otherwise small files get put in the same partition together
        # by default, which we have a lot of
        # We use foreachPartition to reduce the number of function calls, which slow down spark
        distFiles = sc.wholeTextFiles(args.input_files, minPartitions=args.nb_partitions) \
            .foreach(partial(main, output_files=args.output_files))

标签: apache-sparkpysparkrdd

解决方案


我认为您的内存问题是因为您正在使用 Python 代码进行实际的数据拆分。Spark 进程在 JVM 中运行,但是当您调用自定义 Python 代码时,必须将相关数据序列化到 Python 进程(在每个工作节点上)才能执行。这增加了很多开销。我相信您可以完全使用 Spark 操作来完成您想要做的事情——这意味着最终程序将完全在基于 JVM 的 Spark 进程中运行。

尝试这样的事情:

from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window
from pyspark.sql.functions import *

input_path = "..."

split_num = 100

# load filenames & contents
filesDF = spark.createDataFrame( sc.wholeTextFiles(input_path), ['filename','contents'] )

# break into individual lines & number them
linesDF = filesDF.select( "filename", \
                          row_number().over(Window.partitionBy("filename").orderBy("filename")).alias("line_number"), \
                          explode(split(col("contents"), "\n")).alias("contents") )

# split into headers & body
headersDF = linesDF.where(col("line_number") == lit(1))
bodyDF = linesDF.where(col("line_number") > lit(1))

# split the body in 2 based
splitLinesDF = bodyDF.withColumn("split", when(col("line_number") < lit(split_num), 0).otherwise(1))
split_0_DF = splitLinesDF.where(col("split") == lit(0)).select("filename", "line_number", "contents").union(headersDF).orderBy("filename", "line_number")
split_1_DF = splitLinesDF.where(col("split") == lit(1)).select("filename", "line_number", "contents").union(headersDF).orderBy("filename", "line_number")

# collapse all lines back down into a file
firstDF = split_0_DF.groupBy("filename").agg(concat_ws("\n",collect_list(col("contents"))).alias("contents"))
secondDF = split_1_DF.groupBy("filename").agg(concat_ws("\n",collect_list(col("contents"))).alias("contents"))

# pandas-UDF for more memory-efficient transfer of data from Spark to Python
@pandas_udf(returnType=IntegerType())
def writeFile( filename, contents ):
  <save to S3 here>

# write each row to a file
firstDF.select( writeFile( col("filename"), col("contents") ) )
secondDF.select( writeFile( col("filename"), col("contents") ) )

最后,您将需要使用一些自定义 python 代码将每个拆分文件保存到 S3(或者,您可以只用 Scala/Java 编写所有代码)。通过 pandas UDF 执行此操作比将标准 python 函数传递给.foreach(...). 在内部,spark 会将数据以块的形式序列化为 Arrow 格式(每个分区一个),这将非常有效。

此外,您似乎正试图在单个请求中将整个对象放入 S3。如果数据太大,它将失败。您应该查看 S3 流式上传功能。


推荐阅读