akka-stream - 如何使用 akka-streams 实现分页
问题描述
我需要逐行处理大文件并在每个项目上做一些繁重的工作(在 4 核 cpu 上),我认为代码正确:
implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher
val sink = Sink.foreach[String](elem => println("element proceed"))
FileIO.fromPath(Paths.get("file.txt"))
.via(Framing.delimiter(ByteString("\n"), 64).map(_.utf8String))
.mapAsync(4)(v =>
//long op
Future {
Thread.sleep(500)
"updated_" + v
})
.to(sink)
.run()
但我想要输出如下:
100 element proceed
200 element proceed
300 element proceed
357 element proceed. done
如何实施?
解决方案
您可以使用Flow.grouped
:
val groupSize = 100
val groupedFlow = Flow[String].grouped(groupSize)
现在可以在您的之前或之后注入此流程mapAsync
:
FileIO.fromPath(Paths.get("file.txt"))
.via(Framing.delimiter(ByteString("\n"), 64).map(_.utf8String))
.via(groupedFlow)
...
推荐阅读
- python - 在产生第一个项目后如何使用不同的参数再次调用第一个解析函数以产生另一个项目
- arrays - Rust“未实现特征 `std::array::LengthAtMost32`”
- php - 如何从 Laravel Homestead 的存储文件夹中访问图像(符号链接问题)?
- java - 如何在 TreeMap 中将当前类的实例作为值返回?
- ios - 无法发出将 tokenID 插入 request_token 字段的 POST 请求
- python - 查找具有相同字符串值的组的索引
- javascript - 如何使用 BeautifulSoup 从 indiegogo 抓取项目 url?
- python - 想要根据位置获取产品可用数量(odoo 12)
- python - Python NLTK 删除不属于 URL 的内部标点符号
- python - 使用“trepan3k”时无法从“xdis”导入名称“iscode”