scala - 如何从 Cassandra 流式传输所有记录?
问题描述
我需要从 Cassandra 流式传输所有记录。目前我正在使用akka-persistence-cassandra
流式传输数据:
val querier =
PersistenceQuery(system)
.readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val selectDistinctPersistenceIds = new SimpleStatement(
"SELECT DISTINCT persistence_id, partition_nr FROM messages")
.setFetchSize(100000)
querier.session.select(selectDistinctPersistenceIds).map { row =>
val id = row.getString(0)
id
}
当记录数约为 150 万条时,这可以正常工作。但是当记录数超过 150 万条记录时,我会收到read timeout
错误消息。
我在用:
"com.typesafe.akka" %% "akka-persistence-cassandra" % "0.58"
"com.typesafe.akka" %% "akka-persistence" % "2.6.12"
"com.typesafe.akka" %% "akka-persistence-query" % "2.6.12"
编辑:错误日志:
com.datastax.driver.core.exceptions.OperationTimedOutException: [/<ip-address>:9042] Timed out waiting for server response", exceptionStackTrace="java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.OperationTimedOutException: [/<ip-address>:9042] Timed out waiting for server response
at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:552)
at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:513)
at akka.persistence.cassandra.package$ListenableFutureConverter$$anon$2.$anonfun$run$2(package.scala:25)
...
解决方案
问题在于您的驱动程序会话设置根据您的需要进行调整。
可能是间隙超时问题 或增加重试次数。和超时设置。
推荐阅读
- clojure - Clojure Loop 只能从尾部位置重复
- python - 在熊猫的loc方法中动态更改行索引
- css - 为什么这个 css 在演示中工作,但不在我的项目中?
- list - Haskell - 给定两个整数,找到所有子列表
- android - 在 Android 的 EditText 中设置文本时出错
- python - 尝试安装 Pyro 库时出错
- handle - noUiSlider 手柄有两条小竖线,造型后不会消失
- php - 致命错误:更新 wordpress 后函数名称必须是字符串
- node.js - 无法从服务器获得实际响应
- python - 方法 strip() 没有从我的字符串中删除字符