首页 > 解决方案 > 如何在 Spark join 中配置 PoolingOption

问题描述

我正在使用结构化流 2.4 并尝试使用 foreachBatch 接收器写入一个节点 Cassandra,例如:

foreachBatch { (df, batchId) =>
        df
          .rdd
          .repartitionByCassandraReplica("ks", "tbl")
          .leftJoinWithCassandraTable("ks", "tbl")
          .on(SomeColumns("id"))
          .map(...)
          .toDF(...)
          .write
          .cassandraFormat("tbl", "ks")
          .mode("Append")
          .save()

当 df 有 100 万行时,sp​​ark 可以将一些行写入 Cassandra,然后抛出:

WARN QueryExecutor: BusyPoolException ... Retrying

接着:

ERROR QueryExecutor: Failed to execute: com.datastax.spark.connector.writer.RichBoundStatement@

卡桑德拉在那之后停了下来。因此,如果我想配置诸如 PoolingOptions 之类的东西,任何人都可以提供一些有关如何在 foreachBatch 接收器中配置 Datastax 连接器的示例吗?

谢谢

标签: apache-sparkcassandradatastax

解决方案


推荐阅读