apache-spark - 在 pyspark 上加入两次计算,也许我不明白懒惰?
问题描述
自从我上次使用 spark 太久了,我再次使用 Spark 3.1,这是我的问题:我有 20M 行加入了 400M 行,原始代码是:
times= [50000,20000,10000,1000]
for time in times:
join = (df_a.join(df_b,
[
df_a["a"] == df_b["a"],
(unix_timestamp(events["date"]) - unix_timestamp(details["date"])) / 3600
> 5,
(df_a["task"]) = (df_b["task"]-time))
], 'left')
知道每次迭代(时间变量)都包含在与每个值进行比较之前我认为使 DataFrame 更轻的下一个迭代,所以编码如下:
times= [50000,20000,10000,1000]
join = (df_a.join(df_b,
[
df_a["a"] == df_b["a"],
(unix_timestamp(events["date"]) - unix_timestamp(details["date"])) / 3600
> 5,
(df_a["task"]) = (df_b["task"]-50000))
], 'left')
join.checkpoint() # Save current state, and cleaned dataframe
for time in times:
step_join = join_df.where((join_df["task"]) = (join_df["task"]-time)))
# Make calculations and Store result for the iteration...
在查看 Spark 历史服务器上的可视 SQL 图时,似乎没有使用我在第二次连接上改进的解决方案(?),它在每次迭代时再次使整个左连接,而不是使用更干净、更轻的 DataFrame .
我的最终想法是在下一次迭代中使用新的 df,这样每个过滤器都会更轻。我的想法正确吗?我错过了什么吗?
它的样子,这是一个仍在运行的代码,中间的 SortMergeJoin 是解耦过滤器,第二个“过滤器”只过滤了一点,但在左右你可以看到它再次计算SortMergeJoin 而不是重用之前计算的。
上次必须删除检查点,因为连接上有 55B 行,很难存储数据 (>100TB)
我的集群配置为 30 个实例 64vcore 488GB RAM + 驱动程序
"spark.executor.instances", "249").config("spark.executor.memoryOverhead", "10240").config(
"spark.executor.memory", "87g").config("spark.executor.cores", "12").config("spark.driver.cores", "12").config(
"spark.default.parallelism", "5976").config("spark.sql.adaptive.enabled", "true").config(
"spark.sql.adaptive.skewJoin.enabled", "true").config("spark.sql.shuffle.partitions", "3100").config(
"spark.yarn.driver.memoryOverhead", "10240").config("spark.sql.autoBroadcastJoinThreshold", "2100").config(
"spark.sql.legacy.timeParserPolicy", "LEGACY").getOrCreate()
我在这个网站上使用 excel 计算器来调整除spark.sql.shuffle.partitions之外的所有内容 https://www.c2fo.io/c2fo/spark/aws/emr/2016/07/06/apache-spark- config-cheatsheet/现在每个节点使用 10 个执行器
尝试在连接上使用 .cache() 它仍然比 4 个并行连接慢,第一个连接更慢。请注意,.cache() 对子集有好处,但对于 100TB 的连接结果,它会更慢,因为它会缓存到磁盘。谢谢!
解决方案
更新答案(2021 年 5 月 9 日):
我认为您可以尝试withColumn
通过在连接后指定值来使用方法在数据中指定一个分区列when(.. ,.. ).otherwise(..)
(您可以为 4 个不同的值嵌套多个 when/otherwise 块)。不仅仅是用partitionBy
. 在这种情况下,您不需要重新计算 4 次。一次计算就足够了。
老答案:
我认为您可能想使用df.cache()
函数来防止相同的计算。
join_df = (df_a.join(df_b,
[
df_a["a"] == df_b["a"],
(unix_timestamp(events["date"]) - unix_timestamp(details["date"])) / 3600
> 5,
(df_a["task"]) = (df_b["task"]-50000))
], 'left').cache()
Spark 将计算所有结果并将其保存到内存和磁盘中。它将重用预先计算join_df
的新过滤器。
推荐阅读
- typescript - 在抛出任何错误时进行开玩笑测试以失败?
- java - 为什么我不能一次初始化多个数组?
- java - Spring MVC @ModelAttribute 不区分大小写
- pandas - 在应用函数中访问索引
- python-3.x - 来自 OneHotEncoder 的功能名称
- java - 是否建议根据参数更改方法的返回类型?
- python - 在 python 项目的不同依赖项的环境之间自动切换
- angular - 如何使用 Webpack 5 编码拆分 Angular 11
- javascript - 不能在 vue 组件中包含高位图表
- mysql - MySQL 已安装,但在 MacOS Mojave 上找不到