apache-kafka - 如何控制/暂停子源/流中的akka流流
问题描述
我们使用 akka 流和 alpakka kafka 来使用来自不同主题的消息,使用主题模式订阅。使用 groupBy() 为每个主题创建子源/流。
我们如何暂停/恢复/控制单个主题的消息处理,而不影响其他主题的消息处理。
以下选项不起作用。
使用 RestartSource 将重新启动源并影响所有主题的处理。
使用 RestartFlow 会丢弃失败的元素,也会影响所有主题的处理。
如果我们必须为所有主题重新启动源并且有额外的开销,那么为每个主题创建一个源是资源密集型的。
Consumer
.committableSource(consumerSettings.withGroupId("test-group"),
Subscriptions.topicPattern("/some/pattern"))
.groupBy(5000, msg -> msg.record().topic())
.mapAsync(1, msg -> business(msg).thenApply(response -> {
if (response.status().isFailure()) {
throw new RuntimeException("Boom");
}
return msg.committableOffset();}))
.mapAsync(1, offset -> offset.commitJavadsl());
解决方案
推荐阅读
- linux - /opt/project/Qt/spreadsheet/cell.h:0:注意:未找到相关类。没有产生输出
- android - 找不到:couchbase-lite-android-1.4.1 - ionic android
- javascript - 使用 geojson-vt 在 OpenLayers 中加载矢量切片时出现问题
- javascript - 异步函数在 Nodejs 中的行为不符合预期
- express - Alexa技能发展
- iis - 如何在 IIS 主机中自动化补丁(一些 SVN 提交)?
- azure-devops - Azure DevOps 管道环境变量始终为空
- oracle - Oracle:是否可以在不指定列的情况下使用 CTAS 创建 IOT 的副本?
- typescript - 从 chrome 控制台中提取变量以供以后在单元测试中使用
- powershell - 无法转换为参数所需的类型“System.String”。不支持指定的方法