scala - Akka Streams - 流大表
问题描述
如何使用 Akka Streams 流式传输大表中的所有行?
我需要从关系数据库中流出一些数据。在某些情况下,我会得到大量的 ID。在其他情况下,我需要流出整个表,最多 1e9 行。对于我有(大)PK ID 列表(见下文)的情况,我有一个可行的解决方案。我将 ID 列表拆分为 1000 个元素的批次,一次选择一个批次并返回数据。但是,当我没有 PK ID 列表时,这不适用于需要流式传输整个表的情况。我在想
- 从 my_table 中选择 min(pk_id), max((pk_id)
- 将区间 (min_id, max_id) 拆分为 N 个批次,使得每个批次的大小 < S
- 使用下面的解决方案一次流式传输一个这样的批次
但这似乎很复杂。有没有更好的设计?(这是我的第一个 Akka Streams 项目。)
import java.sql.Connection
import akka.stream.stage.{AbstractOutHandler, GraphStage, GraphStageLogic}
import akka.stream.{Attributes, Outlet, SourceShape}
import com.typesafe.config.Config
import scala.collection.mutable
import scala.util.control.NonFatal
class DbSource(tableName: String, abc: String) extends GraphStage[SourceShape[String]] {
private val baseQry: String = query(tableName, abc)
private val jdbc: Config = appConfig.config.getConfig("jdbc")
private implicit val conn: Connection = ...
private var batchID = 0
private var numRows = 0
private val batches: Seq[Seq[Long]] = idBatches(tableName, idName, pkIDs)
val out: Outlet[String] = Outlet.create(s"$tableName.out")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) {
setHandler(out, new AbstractOutHandler() {
override def onPull(): Unit = {
maybeSomeBatch.map(extractBatch).foreach(msg => push(out, msg))
}
})
}
}
private def maybeSomeBatch: Option[Seq[Long]] = {
if (batchID < batches.size) {
batchID += 1
Some(batches(batchID - 1)) // because we have already incremented it
} else None
}
override val shape: SourceShape[String] = SourceShape.of(out)
private def extractBatch(batch: Seq[Long]): String = {
// select from table where ID in batch
// package the data and return as a string
}
}
解决方案
推荐阅读
- html - 如何使html组件居中?
- java - 接口序列化
- java - 如何从 Jenkins 启动 java 服务
- python - 有没有办法让 gensim LDA 自动选择最佳主题数量
- javascript - 将应用程序部署到 Heroku 后无法获取 API 数据(在本地完美运行)
- reactjs - 如何使用 React JS 在 Redirect 上执行函数
- php - 为什么无法识别用于重新登录的 Laravel League/ouath2 令牌
- javascript - 对数组对象内的数组元素执行操作
- php - 将对象数组转换为 Vue js 和 php 中的对象
- javascript - 画布在 60Hz 显示器上闪烁,在 144Hz 显示器上工作正常