首页 > 解决方案 > Pyspark 笔记本使数据块中的集群过载

问题描述

我有一个使用多个 UDF 的 pyspark 代码。该代码的目的是根据多层次的组织数据生成人类可读的评论。数据是 4M 行 32 列,所有这些都是顺序需要的。

输入数据框列:

col1    col2    col3    col4    col5.....col30  amt1    amt2    amt3    amt4

该代码将一次取 2 列,根据 4 个数量列生成语句,然后取下一组列并继续直到 col30。其中一些级别的逻辑与其他级别不同。

Level1 : col1 and col2 along with 4 amt columns -->save output in output1 dataframe
Level2 : col2 and col3 along with 4 amt columns -->save output in output2 dataframe
Level3 : col3 and col4 along with 4 amt columns -->save output in output3 dataframe
Level4 : col4 and col5 along with 4 amt columns -->save output in output4 dataframe
and so on...

根据不同的数据参数,我需要做同样的事情大约 42 次。

现在在每次迭代结束时,我执行一个 reduce 函数来组合所有输出数据帧,然后写入 parquet 文件。这发生在每次迭代中。完成所有迭代后,我读取镶木地板文件。缩进是forfor循环

df_list = [output1, output2, output3, ....]
df_1 = reduce(DataFrame.unionAll, df_list).cache()  
df_1.coalesce(1).write.mode('append').parquet(/FileStore/tables/newfile)
df_final_temp = spark.read.parquet(/FileStore/tables/newfile)

我遇到的问题是,每当我编写镶木地板文件时,大约需要 20 分钟。每次它将有大约 56 行和 12 列。当我阅读镶木地板文件时,大约需要 2 小时。它应该有大约 2300 行和 12 列。这主要导致 spark 服务器自动重新启动。在开始写入 parquet 文件之前,代码本身会运行 3 小时。

有没有办法优化这个?

我的 databricks 集群有 8 个内核和 16 个内核的 56GB 内存。欢迎任何建议。

标签: pythondataframepysparkdatabricks

解决方案


推荐阅读