scala - 将数据帧从 spark 集群写入 cassandra 集群:分区和性能调优
问题描述
我有两个集群 - 1. Cloudera Hadoop- Spark 作业在这里运行 2. Cloud - Cassandra 集群,多个 DC
在将数据帧从我的 spark 作业写入 cassandra 集群时,我在写入之前在 spark 中进行了重新分区(repartionCount=10)。见下文:
import org.apache.spark.sql.cassandra._
records.repartition(repartitionCount).write.cassandraFormat(table, keySpace)
.mode(SaveMode.Append)
.options(options)
.option(CassandraConnectorConf.LocalDCParam.name, cassandraDC.name)
.option(CassandraConnectorConf.ConnectionHostParam.name, cassandraDC.hosts)
.save()
在我的多租户 spark 集群中,对于具有 20M 记录的 spark 批处理加载,在配置下,我看到很多任务失败、资源抢占和运行失败。
spark.cassandra.output.batch.grouping.buffer.size=1000
spark.cassandra.output.batch.grouping.key=partition
spark.cassandra.output.concurrent.writes=20
spark.cassandra.connection.compression=LZ4
我应该如何调整这个?重新分配是罪魁祸首吗?
PS:我一开始的理解是:对于 20M 行的负载,“重新分区”应该将负载均匀地分布在 executor 上(每个分区有 2M 行),并且批处理将在这些分区级别(在 2M 行上)完成。但是现在,我怀疑这是否会导致不必要的洗牌,如果 spark-cassandra-connector 在整个数据帧级别(整个 20M 行)上进行批处理。
更新:删除“重新分区”大大降低了我的 cloudera spark 集群的性能(在 spark 级别设置的默认分区是 - spark.sql.shuffle.partitions: 200
),所以我挖得更深一点,发现我最初的理解是正确的。请注意我的 spark 和 cassandra 集群是不同的。Datastax spark-cassandra-connector 使用 cassandra 协调器节点为每个分区打开一个连接,所以我决定让它保持不变。正如亚历克斯所建议的那样,我已经减少了并发写入,我相信这应该会有所帮助。
解决方案
您不需要在 Spark 中进行重新分区 - 只需将数据从 Spark 写入 Cassandra,不要尝试更改 Spark Cassandra 连接器默认值 - 它们在大多数情况下都可以正常工作。您需要查看发生了什么样的阶段故障 - 很可能您只是因为spark.cassandra.output.concurrent.writes=20
(使用默认值 ( 5
))而导致 Cassandra 超载 - 有时编写器越少有助于更快地写入数据,因为您不会超载 Cassandra,而作业不是t 重新启动。
PSpartition
中的spark.cassandra.output.batch.grouping.key
- 它不是Spark分区,它是Cassandra分区,取决于分区键列的值。
推荐阅读
- android - Android深色主题只是使颜色变暗而不是应用夜间主题颜色
- xamarin - Visual Studio for Mac Community 8.9.1(内部版本 34)不显示 Xamarin 项目
- javascript - 更改网页上所有项目的比例
- r - 如何将 ggplot stat_summary 几何设置为特定的 y 值?
- kubernetes - Kubernetes Nginx 入口速率限制
- php - 如何在 PHP 中进行更新?
- javascript - Discord.JS 无法识别投票中的第一个选项
- python - 如何将多个 CSV 文件合并为一个文件并使用 python 在最终的 CSV 文件中创建超级模式
- python - Python分配给切片vs从末尾删除?
- node.js - 当我从命令行禁用屏幕截图时,我的 testcafe 浏览器没有关闭