java - 在 spring cloud 中使用 StreamBridge 或 kafkaStreams API 方法
问题描述
我正在使用spring-cloud-stream项目来使用 Kafka Streams,我对此并不陌生。由于建议使用函数式编程,我将函数定义为打击:
@Configuration
public class StreamConfiguration {
@Bean
public Consumer<KStream<String, PaymentRequest>> orderProcess() {
return stream ->
stream.mapValues(requestBuilderValueMapper, Named.as("ExtendedRequestBuilder"))
.peek(paymentRequestDateValidatorAction, Named.as("DateValidator"))
.branch(Named.as("RejectionCheck"), rejectionCheckPredicate, (key, value) -> true)
...
}
}
关键是因为我在流中有多个分支,并且每个分支路径都有唯一的目的地,所以不可能使用 Function 来使用 orderProcess-out-0: destination: name
. 我注意到我可以to(destination)
像吹一样使用 Kafka Streams:
stream.mapValues(requestBuilderValueMapper, Named.as("ExtendedRequestBuilder"))
.peek(paymentRequestDateValidatorAction, Named.as("DateValidator"))
...
.to("destination");
或使用StreamBridge
这样的:
stream.mapValues(requestBuilderValueMapper, Named.as("ExtendedRequestBuilder"))
.peek(paymentRequestDateValidatorAction, Named.as("DateValidator"))
...
.peek((k,v) -> streamBridge("bindName-out-0", v));
哪一种是处理这种情况的正确方法?
这两种方法是否保留了 Kafka Streams exact_once_beta事务模式或破坏它?
我注意到的不同之处在于,第一种方法会导致从 Kafka 代理 server.properties 推断出的目标分区数,但后者使 spring 使用 application.yml 分区配置创建主题。
解决方案
是的,对于第二个,Spring 从绑定属性中提供主题。
只要第二个支持 beta 版本,只要 broker 是 2.5 或更高版本并且 spring-kafka 是 2.6.x (或 2.5.x 容器EOSMode
设置为BETA
.
它们在功能上是相同的。
推荐阅读
- python - TypeError:一元+的错误操作数类型:'str'当我认为代码正确时
- spring-boot - 当消费者的处理时间超过 max.poll.interval.ms 时,消费者没有下线
- python - 我想在 profile.html 模板-django 中显示注册人的详细信息,例如(名字、电子邮件、电话号码)
- json - 如何修复“预计解码字典
但找到了一个数组。”,基础错误:无 - laravel - SQLSTATE[HY000]: 一般错误: 2031 (SQL: select * from `work_permits` where `user_id` = ?) 使用 laravel
- plotly - 如何创建子图并使用破折号回调进行更新
- julia - 在 Julia 语言的代码括号示例中看到的 `julia>` 是什么意思?
- javascript - 获取给函数 js 的输入数量
- gradle - 实际移动应用开发项目中的 Appium 测试
- javascript - ReactJS:如何正确链接到下载文件?