apache-kafka-streams - Spring Cloud Stream 中是否可以有多个@StreamListener?
问题描述
我使用 Spring cloud strema Kstream。我测试一个主题和一个@StreamListner
。没关系。
我修改了两个 KStream 输入的代码。(二@StreamListener
)但是,spring cloud 错误..
***************************
APPLICATION FAILED TO START
***************************
Description:
The bean 'stream-builder-process', defined in null, could not be registered. A bean with that name has already been defined in null and overriding is disabled.
Action:
Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true
Process finished with exit code 1
第一个听众
package com.kstream.spring.cloud.test1;
import static com.kstream.spring.cloud.test1.MyBinding.TOPIC1_IN;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;
@Component
public class Topic1Source {
@StreamListener
public void process(@Input(TOPIC1_IN) KStream<String, GenericRecord> logs) {
logs
.foreach((key, value) -> {
System.out.println("Test Topic1 : " + value);
});
}
}
只有第一个听众是好的。
第二个听众
package com.kstream.spring.cloud.test1;
import static com.kstream.spring.cloud.test1.MyBinding.TOPIC2_IN;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;
@Component
public class Topic2Source {
@StreamListener
public void process(@Input(TOPIC2_IN) KStream<String, GenericRecord> logs) {
logs
.foreach((key, value) -> {
System.out.println("Test Topic2 : " + value);
});
}
}
但这是错误
应用程序属性
spring.application.name=kafka-streams-test
spring.kafka.bootstrap-servers=my brokers
# defaults
spring.cloud.stream.kafka.streams.binder.brokers=my brokers
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url=my server
# topic1
spring.cloud.stream.bindings.topic1In.destination=topic1
spring.cloud.stream.bindings.topic1In.consumer.useNativeDecoding=true
spring.cloud.stream.bindings.topic1In.consumer.header-mode=raw
spring.cloud.stream.kafka.streams.bindings.topic1In.consumer.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.topic1In.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
# topic2
spring.cloud.stream.bindings.topic2In.destination=topic2
spring.cloud.stream.bindings.topic2In.consumer.useNativeDecoding=true
spring.cloud.stream.bindings.topic2In.consumer.header-mode=raw
spring.cloud.stream.kafka.streams.bindings.topic2In.consumer.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.topic2In.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
解决方案
我找到错误原因。因为我定义了两个相同的“进程”方法名称。
推荐阅读
- javascript - Angular - 创建一个 Web 组件并在项目中使用
- amazon-web-services - S3 Analytics 读取策略
- node.js - 为什么 Gatsby 不再识别站点目录中的组件?
- node.js - 尝试将 create-react-app 部署到 github 页面时出现很多错误
- python - 为什么“a or b or c”不适用于 NumPy 数组?
- python - 为什么 Selenium 给了我一个不区分大小写的 Style 属性?
- python - 如何将图像(或文本)添加到没有标签的 pyqt5 QtabWidget?
- testing - 赛普拉斯 - 单击()后记录来自请求的响应数据
- swift - 断言失败UITableView:删除与另一个对象有关系的对象
- flutter - Flutter:目标 kernel_snapshot 失败