scala - 通过 MemSQL 中的分区下推在 Spark 中启用并行化
问题描述
我在 MemSQL 中有一个列存储表,其架构类似于以下架构:
CREATE TABLE key_metrics (
source_id TEXT,
date TEXT,
metric1 FLOAT,
metric2 FLOAT,
…
SHARD KEY (source_id, date) USING CLUSTERED COLUMNSTORE
);
我有一个查询 MemSQL 表的 Spark 应用程序(与 Spark Job Server 一起运行)。下面是我正在做的那种 Dataframe 操作的简化形式(在 Scala 中):
sparkSession
.read
.format(“com.memsql.spark.connector”)
.options( Map (“path” -> “dbName.key_metrics”))
.load()
.filter(col(“source_id”).equalTo(“12345678”)
.filter(col(“date”)).isin(Seq(“2019-02-01”, “2019-02-02”, “2019-02-03”))
通过查看物理计划,我已经确认这些过滤谓词被下推到 MemSQL。
我还检查了表中的分区分布是否相当均匀:
±--------------±----------------±-------------±-------±-----------+
| DATABASE_NAME | TABLE_NAME | PARTITION_ID | ROWS | MEMORY_USE |
±--------------±----------------±-------------±-------±-----------+
| dbName | key_metrics | 0 | 784012 | 0 |
| dbName | key_metrics | 1 | 778441 | 0 |
| dbName | key_metrics | 2 | 671606 | 0 |
| dbName | key_metrics | 3 | 748569 | 0 |
| dbName | key_metrics | 4 | 622241 | 0 |
| dbName | key_metrics | 5 | 739029 | 0 |
| dbName | key_metrics | 6 | 955205 | 0 |
| dbName | key_metrics | 7 | 751677 | 0 |
±--------------±----------------±-------------±-------±-----------+
我的问题是关于分区下推。据我了解,有了它,我们可以使用机器的所有内核并利用并行性进行批量加载。根据文档,这是通过创建与 MemSQL 数据库分区一样多的 Spark 任务来完成的。
然而,当运行 Spark 管道并观察 Spark UI 时,似乎只创建了一个 Spark 任务,它对在单个内核上运行的数据库进行单个查询。
我已确保还设置了以下属性:
spark.memsql.disablePartitionPushdown = false
spark.memsql.defaultDatabase = “dbName”
我对分区下推的理解不正确吗?我还缺少其他一些配置吗?
非常感谢您对此的意见。
谢谢!