apache-spark - 如何使 pyspark DAG 并行运行
问题描述
我正在使用 pyspark 处理数据并生成一些指标(大约 25/30)。生成彼此独立的每个指标。由于公司限制,我无法粘贴代码。但我的代码流程在下面提到
def metric1_job():
some operations
Write data from above df
def metric2_job()
some operations
Write data from above df
def metric3_job()
.
.
.
def metric25_job()
some operations
Write data from above df
if __name__ == "__main__":
Read Df 1
Read Df 2
Read Df 3
Read Df 4
Read Df 5
Some operations on above Df.
metric1_job(df1, df2, df3, df4, df5)
metric1_job(df1, df2, df3, df4, df5)
metric1_job(df1, df2, df3, df4, df5)
.
.
.
metric25_job(df1, df2, df3, df4, df5)
现在 pyspark 在每个函数中写入时停止执行,然后在其他函数中开始处理 DAG。所有这些功能都是 DAG,彼此不依赖。一个明显的解决方案是将然后拆分为单独的文件并作为单独的作业运行。但这对我来说是不可用的。有人可以告诉我如何让 spark 并行运行这些 DAG 并同时并行编写。
非常感谢任何帮助。由于上述工作的串行处理需要太多时间
提前致谢
马尼什
解决方案
尽管大多数 Spark 动作本质上是同步的,就像我们一个接一个地执行两个动作一样,它们总是像一个接一个地顺序执行一样。很少有可以异步执行的操作。
在某些场景下,当 Spark 集群的资源没有被完全利用时,我们可以同时在不同的 RDD 上执行不同的异步操作。
有异步操作,例如:countAsync、collectAsync、takeAsync、foreachAsync 和 foreachPartitionAsync。
在您的情况下,您可以在 foreachPartitionAsync
.
参考: https ://forums.databricks.com/questions/2119/how-do-i-process-several-rdds-all-at-once.html
和
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.AsyncRDDActions
推荐阅读
- r - 1 - diaghat 中的错误:二元运算符的非数字参数
- php - Nginx不处理php文件并保存(php7.2-fpm)?
- javascript - 用于验证复选框的 JavaScript 函数
- python - 是否可以在 JSON 中使用变量并用 Python 解析它?
- r - 当模式包含 unicode 时,str_detect 返回的值比预期的多
- tensorflow - Tensorflow tf.reshape: None / -1,是一样的吗?
- regex - 正则表达式从响应中获取会话令牌值
- c# - 未调用 Xamarin.Forms Vidyo.IO“OnLocalCameraStateUpdated”(Connector.IRegisterLocalCameraEventListener)回调
- xcode - 如何从头开始将 xcode 项目上传到 bitbucket?
- xamarin.forms - 如何更改 ListView 项目的字体系列