apache-kafka - 使用 Spring Cloud Stream Kafka Binder 在批处理功能中发布多条消息
问题描述
我正在寻找一个使用spring cloud stream kafka binder(没有Kafka Streams)创建功能样式处理器的示例,该处理器可以使用来自一个主题的一批n条消息并将m条消息发布到另一个主题(m < n)。我尝试了以下方法:
public Function<List<IncomingEvent>, List<Message<OutgoingEvent>>> aggregate() {
return ((batch) -> {
Map<Key, List<IncomingEvent>> resultsMap = batch.stream()
.collect(Collectors.groupingBy(result -> IncomingEvent::key));
List<Message<OutgoingEvent>> aggregationResults = new ArrayList<>();
for (Map.Entry<Key,List<IncomingEvent>> resultGroup : resultsMap.entrySet()) {
OutgoingEvent outgoingEvent = this.getOutgoingEvent(resultGroup.getKey(), resultGroup.getValue());
aggregationResults.add(
MessageBuilder.withPayload(outgoingEvent)
.setHeader(KafkaHeaders.MESSAGE_KEY, outgoingEvent.getKey())
.build()
);
}
return aggregationResults;
});
}
但是,这会产生带有消息数组的单个事件。我尝试将函数的返回类型从 List<Message> 更改为 Flux<Message>,然后返回 Flux.fromIterable(aggregationResults),这似乎是在发布多条消息,但这些消息似乎是带有属性的 Flux 的序列化实例scanAvailable 和预取而不是实际的消息。我在网上找不到任何实现这一目标的例子。看到这样的例子会很有帮助。
解决方案
我认为不支持。使用Consumer<List<IncomingEvent>>
和使用StreamBridge
发布出站消息。
编辑
看来我错了;见https://github.com/spring-cloud/spring-cloud-stream/issues/2143
这是一个文档请求。云流框架支持一个未记录的特性来发布一批消息使用方法签名,如
public Function<Whatever, List<Message<POJO>>> myMethod()
. 这导致列表中的每条消息都由活页夹单独发布。
如果它不适合你,我建议你对这个问题发表评论。
推荐阅读
- jquery - Datetimepicker 不是 Vue js 中的函数
- android - 如何将文本放置在抽屉标题的左下角并降低抽屉标题的高度?
- node.js - 保护 JWT 的最佳方法是什么?
- html - 用于 html 中的多个 Linux 服务器运行状况报告的 Shell 脚本,列颜色根据条件变化
- python - 如何在 Python 中为类包含可变的“标签”
- r - 在具有 NA 值的矩阵上使用 rcorr() 时出错
- xml - 我们可以使用 XML Schema (XSD) 添加我们自己的实体(特殊字符),例如“á”和“œ”吗?
- angular - 具有三个选择字段的 Angular 9 反应性出生日期验证
- arduino - Lilygo ttgo t-beam 和 Lora 发送或接收问题
- linux - 关于postgresql配置的基本问题