akka - 添加流后流完成
问题描述
我有一个简单的流声明如下:
Source.tick(Duration.ofSeconds(1), Duration.ofSeconds(30), Files.list(rootDir).collect(Collectors.toList()))
.mapConcat(files -> files)
.log("scanning logs")
.via(logsFlow.create())
.via(kafkaFlow.create())
// .via(archiveFlow.create())
.runWith(Sink.ignore(), materializer)
.whenComplete((a, b) -> {
log.info("done");
});
使用 archiveFlow 注释掉一切都按预期工作。但是,当我添加额外的流程时,无论是存档流程还是像这样的一些简单流程:
.via(Flow.of(Path.class).map(path -> {
log.info("foo");
return path;
}))
流在第一次滴答后完成。这是为什么?
2019-03-20 21:35:09.292 DEBUG 50089 --- [lt-dispatcher-2] a.kafka.internal.DefaultProducerStage : Stage completed
2019-03-20 21:35:09.294 DEBUG 50089 --- [lt-dispatcher-4] akka.stream.Materializer : [scanning logs] Downstream finished.
2019-03-20 21:35:09.296 INFO 50089 --- [onPool-worker-3] com.example.MyStream : done
解决方案
原来是有一个错误被Akka吞噬了。我使用了监督策略,现在一切正常。
推荐阅读
- r - 在 R 中重新组织数据框?
- python - 如何排斥像ggrepel这样的python图上的文本?
- c - 如何同步我的两个线程,每个线程都对共享资源使用 while 循环?
- blockchain - EIP170 - 哪些合约功能最能解释其规模?
- html - 如何从同一位置开始在不同行中的文本?HTML/CSS
- python - 在 python 中使用库给出警告我无法关闭
- ios - IOS/x-code/AutoLayout 自动布局系统错误
- c# - 无法使用包含的 Linq 查询方法
- javascript - 使用 Electron/Webdriver 拖放
- azure-devops - Azure DevOps 发布管道 - 允许在发布时选择分支