首页 > 解决方案 > 将数据帧从 spark 集群写入 cassandra 集群:分区和性能调优

问题描述

我有两个集群 - 1. Cloudera Hadoop- Spark 作业在这里运行 2. Cloud - Cassandra 集群,多个 DC

在将数据帧从我的 spark 作业写入 cassandra 集群时,我在写入之前在 spark 中进行了重新分区(repartionCount=10)。见下文:

import org.apache.spark.sql.cassandra._
records.repartition(repartitionCount).write.cassandraFormat(table, keySpace)
  .mode(SaveMode.Append)
  .options(options)
  .option(CassandraConnectorConf.LocalDCParam.name, cassandraDC.name)
  .option(CassandraConnectorConf.ConnectionHostParam.name, cassandraDC.hosts)
  .save()

在我的多租户 spark 集群中,对于具有 20M 记录的 spark 批处理加载,在配置下,我看到很多任务失败、资源抢占和运行失败。

spark.cassandra.output.batch.grouping.buffer.size=1000
spark.cassandra.output.batch.grouping.key=partition
spark.cassandra.output.concurrent.writes=20 
spark.cassandra.connection.compression=LZ4

我应该如何调整这个?重新分配是罪魁祸首吗?

PS:我一开始的理解是:对于 20M 行的负载,“重新分区”应该将负载均匀地分布在 executor 上(每个分区有 2M 行),并且批处理将在这些分区级别(在 2M 行上)完成。但是现在,我怀疑这是否会导致不必要的洗牌,如果 spark-cassandra-connector 在整个数据帧级别(整个 20M 行)上进行批处理。

更新:删除“重新分区”大大降低了我的 cloudera spark 集群的性能(在 spark 级别设置的默认分区是 - spark.sql.shuffle.partitions: 200),所以我挖得更深一点,发现我最初的理解是正确的。请注意我的 spark 和 cassandra 集群是不同的。Datastax spark-cassandra-connector 使用 cassandra 协调器节点为每个分区打开一个连接,所以我决定让它保持不变。正如亚历克斯所建议的那样,我已经减少了并发写入,我相信这应该会有所帮助。

标签: scalaapache-sparkcassandradatastax-java-driverspark-cassandra-connector

解决方案


您不需要在 Spark 中进行重新分区 - 只需将数据从 Spark 写入 Cassandra,不要尝试更改 Spark Cassandra 连接器默认值 - 它们在大多数情况下都可以正常工作。您需要查看发生了什么样的阶段故障 - 很可能您只是因为spark.cassandra.output.concurrent.writes=20(使用默认值 ( 5))而导致 Cassandra 超载 - 有时编写器越少有助于更快地写入数据,因为您不会超载 Cassandra,而作业不是t 重新启动。

PSpartition中的spark.cassandra.output.batch.grouping.key- 它不是Spark分区,它是Cassandra分区,取决于分区键列的值。


推荐阅读