首页 > 解决方案 > Spring Cloud Stream 3.0 与 kafka 消费者在批处理模式下获取列表中的单个记录而不是更多

问题描述

尝试使用 Spring Cloud Stream 3.0 以批处理模式使用 kafka 消息。

消费者收到一个包含单个记录的列表,而不是更多。

下面是 yml ,使用的消费者编码

spring:
  cloud:
    stream:
      bindings:
        process-in-0:
          destination: person-command
          consumer:
#            maxAttempts: 1
            batch-mode: true
            properties:
               maxPollRecords: 10
               minFetchBytes: 5000
               fetchMaxWaitMs: 1000

消费者代码

@Transactional
@Bean
public Function<List<PersonEvent>, List<PersonEvent>> process() {


    return pel ->{

        List<Person> lstPerson = new ArrayList<Person>();
        List<PersonEvent> lstPersonEvent = new ArrayList<PersonEvent>();    
        for (PersonEvent personEvent : pel) {
            Person person = new Person();
            person.setName(personEvent.getName());
            lstPerson.add(person);
            personEvent.setType("PersonSaved");
            lstPersonEvent.add(personEvent);

        }
        logger.info("Person Size {}"+lstPerson.size());
        Iterable<Person> savedPerson = repository.saveAll(lstPerson);
        logger.info("Saved Person Size {}"+lstPerson.size());

        return lstPersonEvent;
    };
    }     

输出:日志显示在列表中获取了一条记录,而不是我们需要一批 10 条记录

2020-01-05 15:11:49.044  INFO 29590 --- [container-0-C-1] ication$$EnhancerBySpringCGLIB$$6d65e615 : Person Size {}1
2020-01-05 15:11:49.054  INFO 29590 --- [container-0-C-1] ication$$EnhancerBySpringCGLIB$$6d65e615 : Saved Person Size {}1
2020-01-05 15:11:50.045  INFO 29590 --- [container-0-C-1] ication$$EnhancerBySpringCGLIB$$6d65e615 : Person Size {}1
2020-01-05 15:11:50.053  INFO 29590 --- [container-0-C-1] ication$$EnhancerBySpringCGLIB$$6d65e615 : Saved Person Size {}1

标签: javaapache-kafkaspring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

解决方案


properties消费者下没有这样的财产。无论如何,Kafka 属性并不常见,需要指定为 Kafka 特定的。

请参阅文档

此外,Spring Boot 对任意 Kafka 属性一无所知,不会对它们执行驼峰式转换。请参阅Kafka Binder 文档

尝试

spring:
  cloud:
    stream:
      kafka:
        bindings:
          process-in-0:
            consumer:
              configuration:
                max.poll.records: 10
                min.fetch.bytes: 5000
                fetch.max.wait.ms: 1000

您可以通过检查 Kafka 客户端输出的 INFO 日志来确认属性设置是否符合预期。


推荐阅读