首页 > 解决方案 > 如何创建多个 Spark 任务来查询 Cassandra 分区

问题描述

我有一个使用 Cassandra 存储的 Spark(带有 Spark Job Server)的应用程序。我当前的设置是client使用master=local[*]. 所以有一个 Spark 执行器,它也是使用机器所有 8 个内核的驱动程序进程。我有一个 Cassandra 实例在同一台机器上运行。

Cassandra 表具有 ((datasource_id, date), clustering_col_1...clustering_col_n) 形式的主键,其中 date 是“2019-02-07”形式的一天,并且是复合分区键的一部分。

在我的 Spark 应用程序中,我正在运行如下查询:

df.filter(col("date").isin(days: _*))

在 Spark 物理计划中,我注意到这些过滤器与“datasource_id”分区键的过滤器一起被推送到 Cassandra CQL 查询。

对于我们最大的数据源,我知道分区大小约为 30MB。所以我在 Spark Job Server 配置中有以下设置:

spark.cassandra.input.split.size_in_mb = 1

但是我注意到 Cassandra 加载步骤中没有并行化。尽管有多个大于 1MB 的 Cassandra 分区,但没有创建额外的 spark 分区。只有一个任务在单个核心上执行所有查询,因此需要约 20 秒来加载对应于约 100 万行的 1 个月日期范围的数据。

我尝试了以下替代方法:

  df union days.foldLeft(df)((df: DataFrame, day: String) => {
    df.filter(col("date").equalTo(day))
  })

这确实为 cassandra 中的每个“日”分区创建了一个 spark 分区(或任务)。但是,对于 cassandra 分区的大小要小得多的较小数据源,这种方法被证明是相当昂贵的,因为它创建了过多的任务,并且由于它们的协调而产生了开销。对于这些数据源,将许多 cassandra 分区集中到一个 spark 分区中是完全可以的。因此,为什么我认为使用spark.cassandra.input.split.size_in_mb配置在处理小型和大型数据源时都证明是有用的。

我的理解错了吗?为了使此配置生效,我还缺少其他什么吗?

PS 我还阅读了有关使用 joinWithCassandraTable 的答案。但是,我们的代码依赖于使用 DataFrame。此外,从 CassandraRDD 转换为 DataFrame 对我们来说不是很可行,因为我们的模式是动态的,不能使用案例类指定。

标签: scalaapache-sparkcassandraapache-spark-sqlspark-cassandra-connector

解决方案


推荐阅读