首页 > 解决方案 > 如何在 Akka 中将刻度源与内部源流结合起来

问题描述

我正在寻找一个调度程序,它为每个给定的时间段打勾并从多个表中流式传输数据。如果下游水槽变慢,我希望ticker也放慢速度。有了它,我可以提供背压。

所以总的来说,我结合了一个解决方案,但我结束Source[Source[_]]了,因为外部 Source 的工作独立于内部 Source 性能。

这是代码的简化版本,我创建了 Slick 源以从 DB 中获取数据;

val source = Source.tick(initialDelay.second, 1.second, "ss")
    .map(_ => getSegments)
    .mapConcat { case segments =>
      segments.map { s =>

        val newUsers = Slick.source(sql"SELECT hguser_id FROM sys_app_${s.segment.appId}_data".as[Int]).map(id => (id, s.newData))
       
        val allUsers = Slick.source(sql"SELECT hguser_id FROM sys_app_${s.segment.appId}_someotherdata".as[Int]).map(id => (id, s.wholeSegment))
        
        Source.combine(newUsers, allUsers)(Merge(_))
      }
    }

最终我想从多个表中流式传输数据。

谢谢你。

标签: slickakka-streamalpakka

解决方案


推荐阅读