首页 > 解决方案 > Alpakka Kafka 流永远不会被终止

问题描述

我们正在使用 Alpakka Kafka 流来消费来自 Kafka 的事件。以下是流的定义方式:

ConsumerSettings<GenericKafkaKey, GenericKafkaMessage> consumerSettings = 
    ConsumerSettings
        .create(actorSystem, new KafkaJacksonSerializer<>(GenericKafkaKey.class), 
                new KafkaJacksonSerializer<>(GenericKafkaMessage.class))
        .withBootstrapServers(servers).withGroupId(groupId)
        .withClientId(clientId).withProperties(clientConfigs.defaultConsumerConfig());
CommitterSettings committerSettings = CommitterSettings.create(actorSystem)
        .withMaxBatch(20)
        .withMaxInterval(Duration.ofSeconds(30));
Consumer.DrainingControl<Done> control = 
    Consumer.committableSource(consumerSettings, Subscriptions.topics(topics))
        .mapAsync(props.getMessageParallelism(), msg ->
                CompletableFuture.supplyAsync(() -> consumeMessage(msg), actorSystem.dispatcher())
                        .thenCompose(param -> CompletableFuture.supplyAsync(() -> msg.committableOffset())))
        .toMat(Committer.sink(committerSettings), Keep.both())
        .mapMaterializedValue(Consumer::createDrainingControl)
        .run(materializer);

这是关闭流的一段代码:

CompletionStage<Done> completionStage = control.drainAndShutdown(actorSystem.dispatcher());
completionStage.toCompletableFuture().join();

我也尝试在可完成的未来上做一个得到。但无论是加入还是获得未来都不会返回。有没有其他人也遇到过类似的问题?我在这里做错了什么吗?

标签: apache-kafkaakkaakka-streamalpakka

解决方案


如果要从流外部控制流终止,则需要使用 KillSwitch :https ://doc.akka.io/docs/akka/current/stream/stream-dynamic.html


推荐阅读