apache-spark - 如何使用 Apache Spark 并行读写两个 DataFrame
问题描述
我正在从 JDBC 数据源创建两个 DataFrame,然后将它们都写入 S3 存储桶。写入 S3 的文件的时间戳相隔 20 秒,这告诉我这些操作不是并行执行的。出于测试目的,从相同的数据库/表和相同数量的行加载数据。如何使读取和写入并行执行?
python 脚本在具有 2 个 DPU(标准工作器类型)的 AWS Glue 开发终端节点上运行。
df1 = spark.read.format("jdbc").option("driver", driver).option("url", url).option("user", user).option("password", password).option("dbtable", query1).option("fetchSize", 50000).load()
df2 = spark.read.format("jdbc").option("driver", driver).option("url", url).option("user", user).option("password", password).option("dbtable", query2).option("fetchSize", 50000).load()
df1.write.mode("append").format("csv").option("compression", "gzip").option("timestampFormat", "yyyy.MM.dd HH:mm:ss,SSS").option("maxRecordsPerFile", 1000000).save("s3://bucket-name/test1")
df2.write.mode("append").format("csv").option("compression", "gzip").option("timestampFormat", "yyyy.MM.dd HH:mm:ss,SSS").option("maxRecordsPerFile", 1000000).save("s3://bucket-name/test2")
解决方案
启用粘合作业的并发执行,然后在单个作业中运行该作业两次,因为 Spark 是分布式处理,所以无法并行保存数据帧。
推荐阅读
- tensorflow - WARNING:root:Variable [...] 在检查点不可用
- r - 根据前几天的数据(滞后变量)创建新的数据框列
- google-kubernetes-engine - 如何确定 gke 集群是否设置了自动升级
- javascript - 如何在同一个函数中使用 res.render 和 res.redirect
- java - 阻止用户调用父函数
- matlab - Matlab的抛物线方程工具箱(PETOOL)的接口问题
- php - 带有 Ubuntu LAMP 的 Windows WSL - PHP 未执行
- php - 数据未使用 Laravel Api 资源保存到数据透视表中
- python - 比较数据框的两列并创建一个新列
- c# - 如何在 Unity 中获取 android 的内存根?