首页 > 解决方案 > Akka Streams - 流大表

问题描述

如何使用 Akka Streams 流式传输大表中的所有行?

我需要从关系数据库中流出一些数据。在某些情况下,我会得到大量的 ID。在其他情况下,我需要流出整个表,最多 1e9 行。对于我有(大)PK ID 列表(见下文)的情况,我有一个可行的解决方案。我将 ID 列表拆分为 1000 个元素的批次,一次选择一个批次并返回数据。但是,当我没有 PK ID 列表时,这不适用于需要流式传输整个表的情况。我在想

  1. 从 my_table 中选择 min(pk_id), max((pk_id)
  2. 将区间 (min_id, max_id) 拆分为 N 个批次,使得每个批次的大小 < S
  3. 使用下面的解决方案一次流式传输一个这样的批次

但这似乎很复杂。有没有更好的设计?(这是我的第一个 Akka Streams 项目。)

import java.sql.Connection    
import akka.stream.stage.{AbstractOutHandler, GraphStage, GraphStageLogic}
import akka.stream.{Attributes, Outlet, SourceShape}
import com.typesafe.config.Config    
import scala.collection.mutable
import scala.util.control.NonFatal

class DbSource(tableName: String, abc: String) extends GraphStage[SourceShape[String]] {
  private val baseQry: String = query(tableName, abc)
  private val jdbc: Config = appConfig.config.getConfig("jdbc")
  private implicit val conn: Connection = ...
  private var batchID = 0
  private var numRows = 0
  private val batches: Seq[Seq[Long]] = idBatches(tableName, idName, pkIDs)

  val out: Outlet[String] = Outlet.create(s"$tableName.out")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) {
      setHandler(out, new AbstractOutHandler() {
        override def onPull(): Unit = {
          maybeSomeBatch.map(extractBatch).foreach(msg => push(out, msg))
        }
      })
    }
  }

  private def maybeSomeBatch: Option[Seq[Long]] = {
    if (batchID < batches.size) {
      batchID += 1
      Some(batches(batchID - 1)) // because we have already incremented it
    } else None
  }

  override val shape: SourceShape[String] = SourceShape.of(out)

  private def extractBatch(batch: Seq[Long]): String = {
      // select from table where ID in batch
      // package the data and return as a string
  }
}

标签: scalaakkaakka-stream

解决方案


推荐阅读