首页 > 解决方案 > 带有函数式编程的 spring-cloud-stream

问题描述

在我的应用程序中,streamlistner 从多个主题中消费了 kafka 消息,但我想为一个主题添加功能性消费者 bean,这可能是一些主题由 streamlistner 消费,而一些主题与消费者 bean 在同一个应用程序中?当我尝试创建的主题仅由streamlistner 消费而不是为消费者bean 创建?

应用程序.yml

spring:
  cloud:
    function:
        definition: printString
    stream:
        kafka:
            binder:
                brokers: localhost:9092
                zkNodes: localhost:2181
                autoCreateTopics: true
                autoAddPartitions: true
            bindings:
                printString-in-0:
                  destination: function-input-topic
                  group: abc
                  consumer:
                    maxAttempts: 1
                    partitioned: true
                    concurrency: 2
                xyz-input:
                  group: abc
                  destination: xyz-input-input
                  consumer:
                    maxAttempts: 1
                    partitioned: true
                    concurrency: 2

主题是为 xyz 输入主题创建的,但不是为函数输入主题创建的

消费豆

import java.util.function.Consumer;

@Component
public class xyz {

   @Bean
   Consumer<String> printString() {
       return System.out::print;
   }
}

kafkaConfig 接口

public interface KafkaConfig {

   @Input("xyz-input")
   SubscribableChannel inbound();
}

标签: apache-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

解决方案


不,我们过去尝试过,但是当两种编程模式发生冲突时会出现一些微妙的问题。将任何StreamListener内容分解为功能方式(主要是删除代码)非常简单,并且应用程序实际上变得更加简单。

一起摆脱KafkaConfig界面,摆脱,@EnableBinding你应该没问题。


推荐阅读