首页 > 解决方案 > 如何从 Cassandra 流式传输所有记录?

问题描述

我需要从 Cassandra 流式传输所有记录。目前我正在使用akka-persistence-cassandra流式传输数据:

val querier =
        PersistenceQuery(system)
          .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
      
      val selectDistinctPersistenceIds = new SimpleStatement(
      "SELECT DISTINCT persistence_id, partition_nr FROM messages")
        .setFetchSize(100000)

        querier.session.select(selectDistinctPersistenceIds).map { row =>
          val id = row.getString(0)
          id
        }

当记录数约为 150 万条时,这可以正常工作。但是当记录数超过 150 万条记录时,我会收到read timeout错误消息。

我在用:

"com.typesafe.akka" %% "akka-persistence-cassandra" % "0.58"
"com.typesafe.akka" %% "akka-persistence" % "2.6.12"
"com.typesafe.akka" %% "akka-persistence-query" % "2.6.12"

编辑:错误日志:

com.datastax.driver.core.exceptions.OperationTimedOutException: [/<ip-address>:9042] Timed out waiting for server response", exceptionStackTrace="java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.OperationTimedOutException: [/<ip-address>:9042] Timed out waiting for server response
    at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:552)
    at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:513)
    at akka.persistence.cassandra.package$ListenableFutureConverter$$anon$2.$anonfun$run$2(package.scala:25)
...

标签: scalacassandraakka

解决方案


问题在于您的驱动程序会话设置根据您的需要进行调整。

可能是间隙超时问题 或增加重试次数。和超时设置。


推荐阅读