首页 > 解决方案 > 在 pyspark 上加入两次计算,也许我不明白懒惰?

问题描述

自从我上次使用 spark 太久了,我再次使用 Spark 3.1,这是我的问题:我有 20M 行加入了 400M 行,原始代码是:

times= [50000,20000,10000,1000]
for time in times:
    join = (df_a.join(df_b,
                                 [
                                     df_a["a"] == df_b["a"],
                                     (unix_timestamp(events["date"]) - unix_timestamp(details["date"])) / 3600
                                     > 5,
                                     (df_a["task"]) = (df_b["task"]-time))
                                 ], 'left')

知道每次迭代(时间变量)都包含在与每个值进行比较之前我认为使 DataFrame 更轻的下一个迭代,所以编码如下:

times= [50000,20000,10000,1000]

join = (df_a.join(df_b,
                                 [
                                     df_a["a"] == df_b["a"],
                                     (unix_timestamp(events["date"]) - unix_timestamp(details["date"])) / 3600
                                     > 5,
                                     (df_a["task"]) = (df_b["task"]-50000))
                                 ], 'left')

join.checkpoint() # Save current state, and cleaned dataframe

for time in times:
    step_join = join_df.where((join_df["task"]) = (join_df["task"]-time)))
    # Make calculations and Store result for the iteration...

在查看 Spark 历史服务器上的可视 SQL 图时,似乎没有使用我在第二次连接上改进的解决方案(?),它在每次迭代时再次使整个左连接,而不是使用更干净、更轻的 DataFrame .

我的最终想法是在下一次迭代中使用新的 df,这样每个过滤器都会更轻。我的想法正确吗?我错过了什么吗?

它的样子,这是一个仍在运行的代码,中间的 SortMergeJoin 是解耦过滤器,第二个“过滤器”只过滤了一点,但在左右你可以看到它再次计算SortMergeJoin 而不是重用之前计算的。 在此处输入图像描述

这就是处理的样子,每次相同的计算加上过滤器 在此处输入图像描述

上次必须删除检查点,因为连接上有 55B 行,很难存储数据 (>100TB)

我的集群配置为 30 个实例 64vcore 488GB RAM + 驱动程序

        "spark.executor.instances", "249").config("spark.executor.memoryOverhead", "10240").config(
        "spark.executor.memory", "87g").config("spark.executor.cores", "12").config("spark.driver.cores", "12").config(
        "spark.default.parallelism", "5976").config("spark.sql.adaptive.enabled", "true").config(
        "spark.sql.adaptive.skewJoin.enabled", "true").config("spark.sql.shuffle.partitions", "3100").config(
        "spark.yarn.driver.memoryOverhead", "10240").config("spark.sql.autoBroadcastJoinThreshold", "2100").config(
        "spark.sql.legacy.timeParserPolicy", "LEGACY").getOrCreate()

我在这个网站上使用 excel 计算器来调整除spark.sql.shuffle.partitions之外的所有内容 https://www.c2fo.io/c2fo/spark/aws/emr/2016/07/06/apache-spark- config-cheatsheet/现在每个节点使用 10 个执行器

尝试在连接上使用 .cache() 它仍然比 4 个并行连接慢,第一个连接更慢。请注意,.cache() 对子集有好处,但对于 100TB 的连接结果,它会更慢,因为它会缓存到磁盘。谢谢!

标签: apache-sparkpysparkapache-spark-sql

解决方案


更新答案(2021 年 5 月 9 日):

我认为您可以尝试withColumn通过在连接后指定值来使用方法在数据中指定一个分区列when(.. ,.. ).otherwise(..)(您可以为 4 个不同的值嵌套多个 when/otherwise 块)。不仅仅是用partitionBy. 在这种情况下,您不需要重新计算 4 次。一次计算就足够了。

老答案:

我认为您可能想使用df.cache()函数来防止相同的计算。

join_df = (df_a.join(df_b,
                                 [
                                     df_a["a"] == df_b["a"],
                                     (unix_timestamp(events["date"]) - unix_timestamp(details["date"])) / 3600
                                     > 5,
                                     (df_a["task"]) = (df_b["task"]-50000))
                                 ], 'left').cache()

Spark 将计算所有结果并将其保存到内存和磁盘中。它将重用预先计算join_df的新过滤器。


推荐阅读