apache-spark - 如何在 Spark join 中配置 PoolingOption
问题描述
我正在使用结构化流 2.4 并尝试使用 foreachBatch 接收器写入一个节点 Cassandra,例如:
foreachBatch { (df, batchId) =>
df
.rdd
.repartitionByCassandraReplica("ks", "tbl")
.leftJoinWithCassandraTable("ks", "tbl")
.on(SomeColumns("id"))
.map(...)
.toDF(...)
.write
.cassandraFormat("tbl", "ks")
.mode("Append")
.save()
当 df 有 100 万行时,spark 可以将一些行写入 Cassandra,然后抛出:
WARN QueryExecutor: BusyPoolException ... Retrying
接着:
ERROR QueryExecutor: Failed to execute: com.datastax.spark.connector.writer.RichBoundStatement@
卡桑德拉在那之后停了下来。因此,如果我想配置诸如 PoolingOptions 之类的东西,任何人都可以提供一些有关如何在 foreachBatch 接收器中配置 Datastax 连接器的示例吗?
谢谢
解决方案
推荐阅读
- swift - 如何扩展 UISlider 以便它使用检测滑块值何时更改的回调?
- excel - 在用户窗体打开时选择单元格
- corda - 在 deployNodes 期间设置 Corda postgres 表
- python - 使用正则表达式的验证时间格式
- kivy - 如何在 Kivy 中使用希伯来字体?
- javascript - 导航栏不会在第一次点击时更新活动 li
- javascript - 执行 HTTP.put 请求后停止“导航到”
- python - 如何使用命令行备份 postgres 数据库
- shopify - Shopify LinkList 循环使用动态变量
- github - 错误:未能推送一些参考/github/visual Studio