java - Spring Cloud Stream 3.0 与 kafka 消费者在批处理模式下获取列表中的单个记录而不是更多
问题描述
尝试使用 Spring Cloud Stream 3.0 以批处理模式使用 kafka 消息。
消费者收到一个包含单个记录的列表,而不是更多。
下面是 yml ,使用的消费者编码
spring:
cloud:
stream:
bindings:
process-in-0:
destination: person-command
consumer:
# maxAttempts: 1
batch-mode: true
properties:
maxPollRecords: 10
minFetchBytes: 5000
fetchMaxWaitMs: 1000
消费者代码
@Transactional
@Bean
public Function<List<PersonEvent>, List<PersonEvent>> process() {
return pel ->{
List<Person> lstPerson = new ArrayList<Person>();
List<PersonEvent> lstPersonEvent = new ArrayList<PersonEvent>();
for (PersonEvent personEvent : pel) {
Person person = new Person();
person.setName(personEvent.getName());
lstPerson.add(person);
personEvent.setType("PersonSaved");
lstPersonEvent.add(personEvent);
}
logger.info("Person Size {}"+lstPerson.size());
Iterable<Person> savedPerson = repository.saveAll(lstPerson);
logger.info("Saved Person Size {}"+lstPerson.size());
return lstPersonEvent;
};
}
输出:日志显示在列表中获取了一条记录,而不是我们需要一批 10 条记录
2020-01-05 15:11:49.044 INFO 29590 --- [container-0-C-1] ication$$EnhancerBySpringCGLIB$$6d65e615 : Person Size {}1
2020-01-05 15:11:49.054 INFO 29590 --- [container-0-C-1] ication$$EnhancerBySpringCGLIB$$6d65e615 : Saved Person Size {}1
2020-01-05 15:11:50.045 INFO 29590 --- [container-0-C-1] ication$$EnhancerBySpringCGLIB$$6d65e615 : Person Size {}1
2020-01-05 15:11:50.053 INFO 29590 --- [container-0-C-1] ication$$EnhancerBySpringCGLIB$$6d65e615 : Saved Person Size {}1
解决方案
properties
消费者下没有这样的财产。无论如何,Kafka 属性并不常见,需要指定为 Kafka 特定的。
请参阅文档。
此外,Spring Boot 对任意 Kafka 属性一无所知,不会对它们执行驼峰式转换。请参阅Kafka Binder 文档。
尝试
spring:
cloud:
stream:
kafka:
bindings:
process-in-0:
consumer:
configuration:
max.poll.records: 10
min.fetch.bytes: 5000
fetch.max.wait.ms: 1000
您可以通过检查 Kafka 客户端输出的 INFO 日志来确认属性设置是否符合预期。
推荐阅读
- python - 需要逐行读取大txt文件的功能,如果没有更多行,则打开文件并再次迭代
- excel - 带有表引用的 Excel 条件下拉列表
- arrays - 我可以使用 switch 语句来转换 Swift 中数组中的每个值吗?
- swift - 如何在 Swift 中符合 StringProtocol?
- javascript - VSCode 忽略特定文件中的 javascript 异常
- google-cloud-functions - 如何在 dag 云作曲家中从外部触发任务
- excel - 无法完成操作:表格不能与数据透视表、查询结果、表格、合并单元格或 XML 映射重叠
- node.js - 是否可以通过 Axios 请求实现 onAuthStateChanged() ?
- android - Rust Android 开发 \ndk\toolchains\llvm\prebuilt\ 不存在
- apache-spark - pyspark - 将 UDF 应用于列列表并返回多个数据帧