首页 > 解决方案 > 在 spring cloud 中使用 StreamBridge 或 kafkaStreams API 方法

问题描述

我正在使用spring-cloud-stream项目来使用 Kafka Streams,我对此并不陌生。由于建议使用函数式编程,我将函数定义为打击:

@Configuration
public class StreamConfiguration {

    @Bean
    public Consumer<KStream<String, PaymentRequest>> orderProcess() {
         return stream ->
                 stream.mapValues(requestBuilderValueMapper, Named.as("ExtendedRequestBuilder"))
                .peek(paymentRequestDateValidatorAction, Named.as("DateValidator"))
                .branch(Named.as("RejectionCheck"), rejectionCheckPredicate, (key, value) -> true)
                ...
    }
}

关键是因为我在流中有多个分支,并且每个分支路径都有唯一的目的地,所以不可能使用 Function 来使用 orderProcess-out-0: destination: name. 我注意到我可以to(destination)像吹一样使用 Kafka Streams:

stream.mapValues(requestBuilderValueMapper, Named.as("ExtendedRequestBuilder"))
                .peek(paymentRequestDateValidatorAction, Named.as("DateValidator"))
                ...
                .to("destination");

或使用StreamBridge这样的:

stream.mapValues(requestBuilderValueMapper, Named.as("ExtendedRequestBuilder"))
                .peek(paymentRequestDateValidatorAction, Named.as("DateValidator"))
                ...
                .peek((k,v) -> streamBridge("bindName-out-0", v));

哪一种是处理这种情况的正确方法?

这两种方法是否保留了 Kafka Streams exact_once_beta事务模式或破坏它?

我注意到的不同之处在于,第一种方法会导致从 Kafka 代理 server.properties 推断出的目标分区数,但后者使 spring 使用 application.yml 分区配置创建主题。

标签: javaapache-kafkaspring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

解决方案


是的,对于第二个,Spring 从绑定属性中提供主题。

只要第二个支持 beta 版本,只要 broker 是 2.5 或更高版本并且 spring-kafka 是 2.6.x (或 2.5.x 容器EOSMode设置为BETA.

它们在功能上是相同的。


推荐阅读