apache-kafka - 带有函数式编程的 spring-cloud-stream
问题描述
在我的应用程序中,streamlistner 从多个主题中消费了 kafka 消息,但我想为一个主题添加功能性消费者 bean,这可能是一些主题由 streamlistner 消费,而一些主题与消费者 bean 在同一个应用程序中?当我尝试创建的主题仅由streamlistner 消费而不是为消费者bean 创建?
应用程序.yml
spring:
cloud:
function:
definition: printString
stream:
kafka:
binder:
brokers: localhost:9092
zkNodes: localhost:2181
autoCreateTopics: true
autoAddPartitions: true
bindings:
printString-in-0:
destination: function-input-topic
group: abc
consumer:
maxAttempts: 1
partitioned: true
concurrency: 2
xyz-input:
group: abc
destination: xyz-input-input
consumer:
maxAttempts: 1
partitioned: true
concurrency: 2
主题是为 xyz 输入主题创建的,但不是为函数输入主题创建的
消费豆
import java.util.function.Consumer;
@Component
public class xyz {
@Bean
Consumer<String> printString() {
return System.out::print;
}
}
kafkaConfig 接口
public interface KafkaConfig {
@Input("xyz-input")
SubscribableChannel inbound();
}
解决方案
不,我们过去尝试过,但是当两种编程模式发生冲突时会出现一些微妙的问题。将任何StreamListener
内容分解为功能方式(主要是删除代码)非常简单,并且应用程序实际上变得更加简单。
一起摆脱KafkaConfig
界面,摆脱,@EnableBinding
你应该没问题。
推荐阅读
- java - 在java中的数字后添加空格
- python-3.x - 尽管给定完整路径,但图像未显示
- gitlab - 提交页面上的“等待”指示器是什么?
- javascript - 如何获得2个字符串之间的所有匹配项?
- rpm - rpm/yum:yum update 在内部如何处理重叠文件和 yum 意外终止
- bash - 如何在 Bash 版本 3 中获取 BASHPID
- database - 是否直接查询交互历史表以进行 pega 决策
- javascript - 谷歌函数:TypeError:无法读取未定义的属性“名称”
- javascript - 在javascript中过滤对象数组
- c# - 我想运行 if 语句几秒钟。我该怎么做?