首页 > 解决方案 > Spring kafka 在运行时重新创建 Kafka Stream Topology

问题描述

我有一个基于 spring boot、spring-kafka 和 kafka-streams 的应用程序。当应用程序启动时,它会创建带有默认主题列表的 kafka 流拓扑。我需要做的是在运行时编辑/重新创建拓扑。例如,当应用程序已经运行时,出现了新的主题名称,我想将此主题添加到我的拓扑中。目前,我正在考虑以某种方式删除现有拓扑,关闭并清理 KafkaStreams,在我创建拓扑但使用新主题名称的地方运行逻辑,然后再次启动 KafkaStreams。我不想重新启动我的应用程序。有人可以建议我如何在运行时执行此操作吗?

标签: javaspring-bootapache-kafka-streams

解决方案


我找到了 1 个解决方案。我扩展了 StreamsBuilderFactoryBean:

@Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
@Primary
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(KafkaStreamsConfiguration kStreamsConfigs) {
    return new DynamicStreamsBuilderFactoryBean(kStreamsConfigs);
}

public static class DynamicStreamsBuilderFactoryBean extends StreamsBuilderFactoryBean {

    private StreamsBuilder instance;

    public DynamicStreamsBuilderFactoryBean(final KafkaStreamsConfiguration streamsConfig) {
        super(streamsConfig);
    }

    @Override
    public boolean isSingleton() {
        return false;
    }

    @Override
    protected synchronized StreamsBuilder createInstance() {
        if (instance == null) {
            instance = new StreamsBuilder();
        }
        return instance;
    }

    @Override
    public synchronized void stop() {
        instance = null;
        super.stop();
    }
}

当我构建拓扑时,我不使用 StreamsBuilder,而是使用 StreamsBuilderFactoryBean#getObject():

@Component

公共类动态流{

private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;

public void init() {
    StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
        //build topology
}

//call this method when stream reconfiguration is needed
public void reinitialize() {
    streamsBuilderFactoryBean.stop();
    init();
    streamsBuilderFactoryBean.start();
}

}


推荐阅读