apache-spark - 使用 Spark 避免顺序迭代
问题描述
我有一个 python 脚本,它遍历 s3 中的 URL 列表以重新分区每个 URL 中的镶木地板文件,然后将合并的文件写入另一个 s3 目标。对于一小部分 URL,我使用 python 的 multiprocessing.Pool 函数来并行化进程。
我现在需要为数千个 URL 运行这个逻辑。为了能够按时完成所有 URL 的重新分区,我想利用 pyspark 库并将其部署为集群上的 spark 作业。这是 Spark 中的代码:
def myfunction(x):
# spark session is available here
spark.read.parquet("s3_src").coalesce(1).write.parquet("s3_dest" + x +"/")
if __name__ == "__main__":
# spark context is initiated and available here
rdd_urls = [url1, url2, url3, url4, ……, urlN]
rdd_urls.map(lambda x: myfunction(x))
我尝试同时使用 RDD.map() 和 RDD.foreach() 但意识到 Spark 执行程序无法处理 for 循环中的内部代码块并引发以下错误:
_pickle.PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
我目前的理解是执行者不能将火花作业提交到集群,但只有驱动程序可以这样做。但作为 Spark 的新手,我试图找出如何在 Spark 中实现相同的目标。非常感谢任何有关代码示例的帮助。
解决方案
推荐阅读
- snowflake-cloud-data-platform - ON 子句中的聚合函数无效 - 在 Snowflake SQL 中加入数组重叠/数组包含
- php - 我正在尝试添加乌尔都语字体,但它不起作用
- c - 结构、typedef 和 malloc、calloc
- android - ViewPager 中 Fragment 中的 MapView:调用 getMapAsync() 后永远不会调用 onMapReady()
- wso2-am - WSO2 APIM 3.0.0 MB STORE database
- qt - PlaceholderText 无法在 QTextEdit 中显示
- javascript - 如何在滚动时补偿android移动浏览器的隐藏地址栏?如何在所有浏览器中正确测量“vh”?
- sql - 插入时应在表中插入具有列名的值递增
- bpmn - 脚本任务中的 Camunda 模拟函数调用
- android - 旋转或更改语言对话框后再次显示