scala - 如何创建多个 Spark 任务来查询 Cassandra 分区
问题描述
我有一个使用 Cassandra 存储的 Spark(带有 Spark Job Server)的应用程序。我当前的设置是client
使用master=local[*]
. 所以有一个 Spark 执行器,它也是使用机器所有 8 个内核的驱动程序进程。我有一个 Cassandra 实例在同一台机器上运行。
Cassandra 表具有 ((datasource_id, date), clustering_col_1...clustering_col_n) 形式的主键,其中 date 是“2019-02-07”形式的一天,并且是复合分区键的一部分。
在我的 Spark 应用程序中,我正在运行如下查询:
df.filter(col("date").isin(days: _*))
在 Spark 物理计划中,我注意到这些过滤器与“datasource_id”分区键的过滤器一起被推送到 Cassandra CQL 查询。
对于我们最大的数据源,我知道分区大小约为 30MB。所以我在 Spark Job Server 配置中有以下设置:
spark.cassandra.input.split.size_in_mb = 1
但是我注意到 Cassandra 加载步骤中没有并行化。尽管有多个大于 1MB 的 Cassandra 分区,但没有创建额外的 spark 分区。只有一个任务在单个核心上执行所有查询,因此需要约 20 秒来加载对应于约 100 万行的 1 个月日期范围的数据。
我尝试了以下替代方法:
df union days.foldLeft(df)((df: DataFrame, day: String) => {
df.filter(col("date").equalTo(day))
})
这确实为 cassandra 中的每个“日”分区创建了一个 spark 分区(或任务)。但是,对于 cassandra 分区的大小要小得多的较小数据源,这种方法被证明是相当昂贵的,因为它创建了过多的任务,并且由于它们的协调而产生了开销。对于这些数据源,将许多 cassandra 分区集中到一个 spark 分区中是完全可以的。因此,为什么我认为使用spark.cassandra.input.split.size_in_mb
配置在处理小型和大型数据源时都证明是有用的。
我的理解错了吗?为了使此配置生效,我还缺少其他什么吗?
PS 我还阅读了有关使用 joinWithCassandraTable 的答案。但是,我们的代码依赖于使用 DataFrame。此外,从 CassandraRDD 转换为 DataFrame 对我们来说不是很可行,因为我们的模式是动态的,不能使用案例类指定。
解决方案
推荐阅读
- powershell - 将控制台输出写入文件 - 文件意外为空
- jquery - Amazon S3 文件上传:抛出 403
- azure - 如何将密钥保管库的访问权限授予用户分配的身份?
- android - 使用 SnapHelper 选择项目时在 Viewholder 中调用函数
- mongodb - Mongo聚合,分组,最大值并取整行具有最大值
- android - java.lang.ClassNotFoundException:在路径上找不到类“com.google.firebase.provider.FirebaseInitProvider”:DexPathList
- database - 将具有单个数据库的 laravel 项目迁移到多个库
- python - Python 尝试通过 Hotmail 发送电子邮件时出错。暂停
- oracle - 无法从 SQL Developer 检索乌尔都语/阿拉伯语数据
- django - 在 Django/Postgres 中存储挂钟日期时间