apache-kafka - Alpakka Kafka 流永远不会被终止
问题描述
我们正在使用 Alpakka Kafka 流来消费来自 Kafka 的事件。以下是流的定义方式:
ConsumerSettings<GenericKafkaKey, GenericKafkaMessage> consumerSettings =
ConsumerSettings
.create(actorSystem, new KafkaJacksonSerializer<>(GenericKafkaKey.class),
new KafkaJacksonSerializer<>(GenericKafkaMessage.class))
.withBootstrapServers(servers).withGroupId(groupId)
.withClientId(clientId).withProperties(clientConfigs.defaultConsumerConfig());
CommitterSettings committerSettings = CommitterSettings.create(actorSystem)
.withMaxBatch(20)
.withMaxInterval(Duration.ofSeconds(30));
Consumer.DrainingControl<Done> control =
Consumer.committableSource(consumerSettings, Subscriptions.topics(topics))
.mapAsync(props.getMessageParallelism(), msg ->
CompletableFuture.supplyAsync(() -> consumeMessage(msg), actorSystem.dispatcher())
.thenCompose(param -> CompletableFuture.supplyAsync(() -> msg.committableOffset())))
.toMat(Committer.sink(committerSettings), Keep.both())
.mapMaterializedValue(Consumer::createDrainingControl)
.run(materializer);
这是关闭流的一段代码:
CompletionStage<Done> completionStage = control.drainAndShutdown(actorSystem.dispatcher());
completionStage.toCompletableFuture().join();
我也尝试在可完成的未来上做一个得到。但无论是加入还是获得未来都不会返回。有没有其他人也遇到过类似的问题?我在这里做错了什么吗?
解决方案
如果要从流外部控制流终止,则需要使用 KillSwitch :https ://doc.akka.io/docs/akka/current/stream/stream-dynamic.html
推荐阅读
- c# - 将 Windows 打印机驱动程序与“本机”打印机条形码渲染相结合?
- java - 我是否在 Java 中错误地执行了我的操作顺序?
- .net - AcquireTokenAsync 未提供在 Bot V4 中需要很长时间的任何结果
- python - 有没有把字符串变成实际操作的功能?
- angular - 使用 TweenMax 库的 @angular/cli 项目中的错误
- c++ - meta-qt5 do_populate_sdk 不会在 yocto zeus 上创建 qmake
- amazon-web-services - AWS Lambda:代码编辑器不支持 .NET Core 3.1 (C#/PowerShell) 运行时
- javascript - ToggleButton - 如何取消切换?
- android - 仅在arrayadapter中按大于和小于android的值显示列表的子集
- python - AttributeError:模块“osmnx”没有属性“project_gdf”