apache-spark - 如何在每个 Worker 中的 Spark Dataframe 中加载数据,以防止将大量数据加载到 Master 节点
问题描述
我可以使用以下代码从主节点中的 Oracle 数据库读取数据:
val spark = SparkSession
.builder
.master("local[4]")
.config("spark.executor.memory", "8g")
.config("spark.executor.cores", 4)
.config("spark.task.cpus",1)
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:oracle:thin:@x.x.x.x:1521:orcldb")
.option("dbtable", "table")
.option("user", "orcl")
.option("password", "********")
.load()
然后我可以在Workers之间重新分配Dataframe:
val test = jdbcDF.repartition(8,col("ID_Col"))
test.explain
我的问题是我的数据很大,它们不能放在主 RAM 上。因此,我希望每个节点分别读取自己的数据。我想知道是否有任何方法可以从每个 Worker 的数据库中读取数据并将它们加载到 Spark Dataframe。实际上,我想使用 Scala 或 Python 分别将数据加载到每个 Worker Node 中的 Spark Dataframe。
请指导我如何做到这一点?
非常感谢任何帮助。
解决方案
和local
你一起做not have a Resource Mgr
YARN。您有,但是您可以在具有 N 个内核的同一台机器上适当地设置no Workers
并行运行的东西。local[n]
如果您遵循 Alex Ott 的建议并阅读,您将不会加载到 Master。
lowerBound, upperBound, numPartitions
您可以在使用读取数据时使用参数来提高加载速度,spark.read.jdbc
而Cores
不是Executors
在 Workers 上使用。这就是本地的含义以及 Spark 的工作方式。
如果您需要以其他方式进行分区,则需要随后进行重新分区。
如果你有足够的内存和磁盘,你会比较慢,但它会处理。
推荐阅读
- dart - 如何使 RefreshIndicator 消失?
- google-analytics - 除了 Google Tag Manager 之外,还执行 Google Analytics Functions
- mysql - 无法使用 dropwizard 和 mysql docker 容器迁移数据库
- graphql - 错误:不变违规:gatsby-source-graphql 需要指定选项“typeName”
- python - 在类之间共享属性:这里是多继承权和“pythonic”吗?
- php - 为什么第一个 php 文件返回错误信息,而第二个返回正确信息?
- laravel - ReflectionException : 类 env 不存在。难道我做错了什么?
- c# - 将 JSON 数据插入到动态中的字段(例如联系人中的名字)中
- c++ - 如何解决没有上下文类型信息错误的重载函数?
- html - 谷歌浏览器不显示数据列表标签