首页 > 解决方案 > 如何在每个 Worker 中的 Spark Dataframe 中加载数据,以防止将大量数据加载到 Master 节点

问题描述

我可以使用以下代码从主节点中的 Oracle 数据库读取数据:

 val spark = SparkSession
            .builder
            .master("local[4]")
            .config("spark.executor.memory", "8g")
            .config("spark.executor.cores", 4)
            .config("spark.task.cpus",1)
            .appName("Spark SQL basic example")
            .config("spark.some.config.option", "some-value")
            .getOrCreate()

 val jdbcDF = spark.read
              .format("jdbc")
              .option("url", "jdbc:oracle:thin:@x.x.x.x:1521:orcldb")
              .option("dbtable", "table")
              .option("user", "orcl")
              .option("password", "********")
              .load()

然后我可以在Workers之间重新分配Dataframe:

  val test = jdbcDF.repartition(8,col("ID_Col"))
  test.explain

我的问题是我的数据很大,它们不能放在主 RAM 上。因此,我希望每个节点分别读取自己的数据。我想知道是否有任何方法可以从每个 Worker 的数据库中读取数据并将它们加载到 Spark Dataframe。实际上,我想使用 Scala 或 Python 分别将数据加载到每个 Worker Node 中的 Spark Dataframe。

请指导我如何做到这一点?

非常感谢任何帮助。

标签: apache-sparkpysparkapache-spark-sql

解决方案


local你一起做not have a Resource MgrYARN。您有,但是您可以在具有 N 个内核的同一台机器上适当地设置no Workers并行运行的东西。local[n]

如果您遵循 Alex Ott 的建议并阅读,您将不会加载到 Master。

lowerBound, upperBound, numPartitions您可以在使用读取数据时使用参数来提高加载速度,spark.read.jdbcCores不是Executors在 Workers 上使用。这就是本地的含义以及 Spark 的工作方式。

如果您需要以其他方式进行分区,则需要随后进行重新分区。

如果你有足够的内存和磁盘,你会比较慢,但它会处理。


推荐阅读