slick - 如何在 Akka 中将刻度源与内部源流结合起来
问题描述
我正在寻找一个调度程序,它为每个给定的时间段打勾并从多个表中流式传输数据。如果下游水槽变慢,我希望ticker也放慢速度。有了它,我可以提供背压。
所以总的来说,我结合了一个解决方案,但我结束Source[Source[_]]
了,因为外部 Source 的工作独立于内部 Source 性能。
这是代码的简化版本,我创建了 Slick 源以从 DB 中获取数据;
val source = Source.tick(initialDelay.second, 1.second, "ss")
.map(_ => getSegments)
.mapConcat { case segments =>
segments.map { s =>
val newUsers = Slick.source(sql"SELECT hguser_id FROM sys_app_${s.segment.appId}_data".as[Int]).map(id => (id, s.newData))
val allUsers = Slick.source(sql"SELECT hguser_id FROM sys_app_${s.segment.appId}_someotherdata".as[Int]).map(id => (id, s.wholeSegment))
Source.combine(newUsers, allUsers)(Merge(_))
}
}
最终我想从多个表中流式传输数据。
谢谢你。
解决方案
推荐阅读
- html - 使用 flex-direction: row 使 flexbox 子填充可用的垂直空间
- vue.js - v-model 没有在 Android Chrome 浏览器上的输入更新
- java - 如何选择使用 angularjs 下载文件并将其发送到 java 类的路径?
- python-3.x - 在python中搜索二维列表中的出现
- r - 我在 R 中绘制我的数据,但图例不存在,尽管我在命令行中输入了
- java - 更改存储在 ArrayList 或 HashMap 中的对象属性的最佳方法是什么?
- php - 如何在 macOS 服务器中升级 PHP 版本?
- laravel - 如何将集合或数组发送到通知门面
- javascript - “this”如何引用父类?
- python - (Host Django Pythonanywhere) Manage.py collectstatic 返回 FileNotFoundError: [Errno 2]