spring-boot - Spring Boot Kafka Streams - 绑定问题
问题描述
我正在尝试使用此示例构建一个基于 Kafka Streams 的简单流应用程序。
但是,当我启动应用程序时,出现以下错误:有人可以指出我在这里遗漏了什么吗?这是代码,配置和错误
@SpringBootApplication
@Slf4j
@EnableScheduling
@EnableBinding(PersonBinding.class)
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Component
public static class PersonSource {
private final MessageChannel personOut;
@Autowired
PersonSource(PersonBinding personBinding) {
this.personOut = personBinding.personOut();
}
@Scheduled(fixedDelay = 5000L)
public void run() {
Message<Person> message = MessageBuilder
.withPayload(new Person("John", "Doe", Instant.now()))
.build();
try {
personOut.send(message);
log.info("Published message: {}", message);
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
}
@Component
public static class PersonProcessor {
@StreamListener
public void process(@Input(PersonBinding.PERSON_IN) KStream<String, Person> events) {
events.foreach(((key, value) -> System.out.println("Key: " + key + "; Value: " + value)));
}
}
}
@Data
@AllArgsConstructor
class Person {
String firstName;
String lastName;
Instant createdOn;
}
interface PersonBinding {
String PERSON_IN = "pin";
String PERSON_OUT = "pout";
@Output(PERSON_OUT)
MessageChannel personOut();
@Input(PERSON_IN)
KStream<String, Person> personIn();
}
依赖管理(Spring Boot 1.5.13.RELEASE)
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kstream</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.1.0</version>
</dependency>
配置
# Default Configuration
spring.cloud.stream.kstream.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kstream.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
# Out Bindings Configuration
spring.cloud.stream.bindings.pout.destination=pout
spring.cloud.stream.bindings.pout.producer.header-mode=raw
# In Bindings Configuration
spring.cloud.stream.bindings.pin.destination=pout
spring.cloud.stream.bindings.pin.consumer.header-mode=raw
错误
Field configurationProperties in org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration required a single bean, but 2 were found:
- spring.cloud.stream.kafka.binder-org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties: a programmatically registered singleton - binderConfigurationProperties: defined by method 'binderConfigurationProperties' in class path resource [org/springframework/cloud/stream/binder/kstream/config/KStreamBinderSupportAutoConfiguration.class]
Action:
Consider marking one of the beans as @Primary, updating the consumer to accept multiple beans, or using @Qualifier to identify the bean that should be consumed
** 编辑 1 **
将代码上传到 Github https://github.com/tapitoe/demo-spring-cloud-streams/tree/master/src
解决方案
我有一个类似的问题,它在我添加后得到了修复:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
推荐阅读
- asp-classic - CDO.Message 收件人和收件人字段更改为“友好名称”和电子邮件地址
- javascript - 如何将来自 DOM 的用户输入存储在 Javascript 变量中?
- python - 如何识别和去除乱码?
- coffeescript - Atom 编辑器中特定关键字的语法高亮显示
- node.js - nodejs mssql 连接 ETIMEDOUT
- google-sheets - Sumif 在可变日期之间具有多个条件
- assembly - 将指向 Rust 数组的指针传递到 x86-64 Asm -- 指针减一
- c - 错误:类型冲突,错误:形式参数 2 的类型不完整
- javascript - Javascript Onclick 滚动和高亮示例
- android - 一起使用 ARCore 和 Vision SDK Android