首页 > 解决方案 > 如何定义 StreamsBuilderFactoryBean 的两个实例

问题描述

我是 Spring Kafka 的新手。出于某种原因,我想创建两个 StreamsBuilderFactoryBean,如您所见,我定义了两个 StreamsBuilderFactoryBean 一个名为 " commonDSLBuilder" ,另一个是 " propertyDSLBuilder" with props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4)。所以“ commonDSLBuilde”只创建一个消费者,但“ propertyDSLBuilder”创建四个消费者。

 @Configuration
@EnableKafka
public class KafkaStreamsConfig {
    private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfig.class);

    @Value("${spring.kafka.stream.application-id}")
    private String applicationId;

    @Bean(name = "commonDSLBuilder")
    public StreamsBuilderFactoryBean commonDSLBuilder() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        StreamsConfig streamsConfig = new StreamsConfig(props);
        StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(streamsConfig);
        streamsBuilder.setSingleton(Boolean.FALSE);
        return streamsBuilder;
    }

    @Bean(name = "propertyDSLBuilder")
    public StreamsBuilderFactoryBean propertyDSLBuilder() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
        StreamsConfig streamsConfig = new StreamsConfig(props);
        CleanupConfig cleanupConfig = new CleanupConfig(Boolean.TRUE, Boolean.TRUE);
        StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(streamsConfig, cleanupConfig);
        streamsBuilder.setSingleton(Boolean.FALSE);
        return streamsBuilder;
    }
}

commonDSLBuilder我像这样使用“ ”

@Configuration
public class BindPostDSL {

    private static final Logger log = LoggerFactory.getLogger(BindPostDSL.class);

    @Autowired
    @Qualifier("commonDSLBuilder")
    private StreamsBuilder builder;

    @Bean(name = "bindPostKStream")
    public KStream<String, String> kStream() {
        log.info("bind 事件处理启动");
            KStream<String, String> stream = builder.stream("test");
            stream.foreach((key, value) -> {
                log.info("receive kafka bind post,key:{},value:{}", key, value);
            });
            return stream;
    }
}

但是当我启动应用程序时,将创建 5 个消费者(我猜是 1 个commonDSLBuilder+ 4 个propertyDSLBuilder),我该如何解决这个问题。

2018-08-06 10:34:12 [test-streams-827fcc9b-3b9a-46f3-a941-961033cdb2cf-StreamThread-1] INFO  StreamThread:336 - stream-thread [test-streams-827fcc9b-3b9a-46f3-a941-961033cdb2cf-StreamThread-1] Starting
2018-08-06 10:34:13 [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-2] INFO  StreamThread:336 - stream-thread [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-2] Starting
2018-08-06 10:34:13 [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-4] INFO  StreamThread:336 - stream-thread [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-4] Starting
2018-08-06 10:34:13 [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-3] INFO  StreamThread:336 - stream-thread [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-3] Starting
2018-08-06 10:34:13 [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-5] INFO  StreamThread:336 - stream-thread [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-5] Starting

标签: springapache-kafkaapache-kafka-streamsspring-kafka

解决方案


你是在正确的方向。你需要有两个豆子StreamsBuilderFactoryBean和两个豆子KStream。每个 KStream 都会有特定的 StreamsBuilderFactoryBean。你不需要setSingleton(Boolean.FALSE);在streamsBuilder上调用。

@Bean
public FactoryBean<StreamsBuilder> commonDSLBuilder() {
    ...
    StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(streamsConfig);
    return streamsBuilder;
}

@Bean
public FactoryBean<StreamsBuilder> propertyDSLBuilder() {
    ...
    StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(streamsConfig, cleanupConfig);
    return streamsBuilder;
}

@Bean
public KStream<String, String> bindKStream(StreamsBuilder commonDSLBuilder) {
    KStream<String, String> kStream = commonDSLBuilder.stream("commonTopicName");
    kStream.foreach((key, value) -> { ...  });
    return kStream;
}

@Bean
public KStream<String, String> perperyKStream(StreamsBuilder propertyDSLBuilder) {
    KStream<String, String> kStream = propertyDSLBuilder.stream("propertyTopicName");
    kStream.foreach((key, value) -> { ... });
    return kStream;
}

推荐阅读