首页 > 解决方案 > 使用 Spring Cloud Stream Kafka Binder 在批处理功能中发布多条消息

问题描述

我正在寻找一个使用spring cloud stream kafka binder(没有Kafka Streams)创建功能样式处理器的示例,该处理器可以使用来自一个主题的一批n条消息并将m条消息发布到另一个主题(m < n)。我尝试了以下方法:

public Function<List<IncomingEvent>, List<Message<OutgoingEvent>>> aggregate() {
    return ((batch) -> {
        Map<Key, List<IncomingEvent>> resultsMap = batch.stream()
            .collect(Collectors.groupingBy(result -> IncomingEvent::key));
        List<Message<OutgoingEvent>> aggregationResults = new ArrayList<>();
        for (Map.Entry<Key,List<IncomingEvent>> resultGroup : resultsMap.entrySet()) {
            OutgoingEvent outgoingEvent = this.getOutgoingEvent(resultGroup.getKey(), resultGroup.getValue());
            aggregationResults.add(
                MessageBuilder.withPayload(outgoingEvent)
                .setHeader(KafkaHeaders.MESSAGE_KEY, outgoingEvent.getKey())
                .build() 
            );
        }
        return aggregationResults;
    });
}

但是,这会产生带有消息数组的单个事件。我尝试将函数的返回类型从 List<Message> 更改为 Flux<Message>,然后返回 Flux.fromIterable(aggregationResults),这似乎是在发布多条消息,但这些消息似乎是带有属性的 Flux 的序列化实例scanAvailable 和预取而不是实际的消息。我在网上找不到任何实现这一目标的例子。看到这样的例子会很有帮助。

标签: apache-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

解决方案


我认为不支持。使用Consumer<List<IncomingEvent>>和使用StreamBridge发布出站消息。

https://docs.spring.io/spring-cloud-stream/docs/3.1.2/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources

编辑

看来我错了;见https://github.com/spring-cloud/spring-cloud-stream/issues/2143

这是一个文档请求。云流框架支持一个未记录的特性来发布一批消息使用方法签名,如public Function<Whatever, List<Message<POJO>>> myMethod(). 这导致列表中的每条消息都由活页夹单独发布。

如果它不适合你,我建议你对这个问题发表评论。


推荐阅读