首页 > 解决方案 > 使用 Spark 避免顺序迭代

问题描述

我有一个 python 脚本,它遍历 s3 中的 URL 列表以重新分区每个 URL 中的镶木地板文件,然后将合并的文件写入另一个 s3 目标。对于一小部分 URL,我使用 python 的 multiprocessing.Pool 函数来并行化进程。

我现在需要为数千个 URL 运行这个逻辑。为了能够按时完成所有 URL 的重新分区,我想利用 pyspark 库并将其部署为集群上的 spark 作业。这是 Spark 中的代码:

def myfunction(x):
  # spark session is available here
  spark.read.parquet("s3_src").coalesce(1).write.parquet("s3_dest" + x +"/")

if __name__ == "__main__":
  # spark context is initiated and available here
  rdd_urls = [url1, url2, url3, url4, ……, urlN]
  rdd_urls.map(lambda x: myfunction(x))

我尝试同时使用 RDD.map() 和 RDD.foreach() 但意识到 Spark 执行程序无法处理 for 循环中的内部代码块并引发以下错误:

_pickle.PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

我目前的理解是执行者不能将火花作业提交到集群,但只有驱动程序可以这样做。但作为 Spark 的新手,我试图找出如何在 Spark 中实现相同的目标。非常感谢任何有关代码示例的帮助。

标签: apache-sparkpyspark

解决方案


推荐阅读