首页 > 解决方案 > 如何使 pyspark DAG 并行运行

问题描述

我正在使用 pyspark 处理数据并生成一些指标(大约 25/30)。生成彼此独立的每个指标。由于公司限制,我无法粘贴代码。但我的代码流程在下面提到

def metric1_job():
    some operations
    Write data from above df
def metric2_job()
    some operations
    Write data from above df
def metric3_job()
.
.
.
def metric25_job()
    some operations
    Write data from above df

if __name__ == "__main__":
Read Df 1
Read Df 2
Read Df 3
Read Df 4
Read Df 5

Some operations on above Df.
metric1_job(df1, df2, df3, df4, df5)
metric1_job(df1, df2, df3, df4, df5)
metric1_job(df1, df2, df3, df4, df5)
.
.
.
metric25_job(df1, df2, df3, df4, df5)

现在 pyspark 在每个函数中写入时停止执行,然后在其他函数中开始处理 DAG。所有这些功能都是 DAG,彼此不依赖。一个明显的解决方案是将然后拆分为单独的文件并作为单独的作业运行。但这对我来说是不可用的。有人可以告诉我如何让 spark 并行运行这些 DAG 并同时并行编写。

非常感谢任何帮助。由于上述工作的串行处理需要太多时间

提前致谢

马尼什

标签: apache-sparkpyspark

解决方案


尽管大多数 Spark 动作本质上是同步的,就像我们一个接一个地执行两个动作一样,它们总是像一个接一个地顺序执行一样。很少有可以异步执行的操作。

在某些场景下,当 Spark 集群的资源没有被完全利用时,我们可以同时在不同的 RDD 上执行不同的异步操作。

有异步操作,例如:countAsync、collectAsync、takeAsync、foreachAsync 和 foreachPartitionAsync。

在您的情况下,您可以在 foreachPartitionAsync.

参考: https ://forums.databricks.com/questions/2119/how-do-i-process-several-rdds-all-at-once.html

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.AsyncRDDActions


推荐阅读