首页 > 解决方案 > Spring Boot Kafka Streams - 绑定问题

问题描述

我正在尝试使用此示例构建一个基于 Kafka Streams 的简单流应用程序。

字数

但是,当我启动应用程序时,出现以下错误:有人可以指出我在这里遗漏了什么吗?这是代码,配置和错误

@SpringBootApplication
@Slf4j
@EnableScheduling
@EnableBinding(PersonBinding.class)
public class DemoApplication {

  public static void main(String[] args) {
    SpringApplication.run(DemoApplication.class, args);
  }

  @Component
  public static class PersonSource {

    private final MessageChannel personOut;

    @Autowired
    PersonSource(PersonBinding personBinding) {

      this.personOut = personBinding.personOut();
    }

    @Scheduled(fixedDelay = 5000L)
    public void run() {

      Message<Person> message = MessageBuilder
          .withPayload(new Person("John", "Doe", Instant.now()))
          .build();

      try {

        personOut.send(message);

        log.info("Published message: {}", message);
      } catch (Exception e) {

        e.printStackTrace();
        throw e;
      }
    }
  }

  @Component
  public static class PersonProcessor {

    @StreamListener
    public void process(@Input(PersonBinding.PERSON_IN) KStream<String, Person> events) {

      events.foreach(((key, value) -> System.out.println("Key: " + key + "; Value: " + value)));
    }
  }
}

@Data
@AllArgsConstructor
class Person {

  String firstName;

  String lastName;

  Instant createdOn;
}

interface PersonBinding {

  String PERSON_IN = "pin";

  String PERSON_OUT = "pout";

  @Output(PERSON_OUT)
  MessageChannel personOut();

  @Input(PERSON_IN)
  KStream<String, Person> personIn();
}

依赖管理(Spring Boot 1.5.13.RELEASE)

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kstream</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>1.1.0</version>
</dependency>

配置

# Default Configuration
spring.cloud.stream.kstream.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kstream.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
# Out Bindings Configuration
spring.cloud.stream.bindings.pout.destination=pout
spring.cloud.stream.bindings.pout.producer.header-mode=raw
# In Bindings Configuration
spring.cloud.stream.bindings.pin.destination=pout
spring.cloud.stream.bindings.pin.consumer.header-mode=raw

错误

Field configurationProperties in org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration required a single bean, but 2 were found:
  - spring.cloud.stream.kafka.binder-org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties: a programmatically registered singleton - binderConfigurationProperties: defined by method 'binderConfigurationProperties' in class path resource [org/springframework/cloud/stream/binder/kstream/config/KStreamBinderSupportAutoConfiguration.class]


Action:

Consider marking one of the beans as @Primary, updating the consumer to accept multiple beans, or using @Qualifier to identify the bean that should be consumed

** 编辑 1 **

将代码上传到 Github https://github.com/tapitoe/demo-spring-cloud-streams/tree/master/src

标签: spring-bootspring-kafka

解决方案


我有一个类似的问题,它在我添加后得到了修复:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

推荐阅读