首页 > 解决方案 > Autowired不打印值时的KStream对象

问题描述

我正在尝试创建 kstreams bean 并在我的服务中自动装配它。但是,即使我得到相同的对象 stream.print() 也没有给出任何价值,但在同一个 bean 中的 print 正在工作。我想我没有得到配置相同的 StreamBuilder。

配置文件

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfiguration {

    @Autowired private KafkaProperties kafkaProperties;

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-streams2");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(JsonDeserializer.DEFAULT_KEY_TYPE, String.class);
        props.put(JsonDeserializer.DEFAULT_VALUE_TYPE, String.class);
        return new StreamsConfig(props);
    }

    @Bean
    public KStream<String, String> kStreamJson(StreamsBuilder builder) {
        KStream<String, String> stream = builder.stream("topictest", Consumed.with(Serdes.String(), Serdes.String()));
        //stream.print();
        return stream;
    }

}

服务

此处的打印功能不会引发任何错误,也不会打印任何值

@Service
public class KStreamsService {

    @Autowired
    KStream<String, String> kStream;

    void process() {
        System.out.println("Hai");
        kStream.print();
    }
}

主要的

@SpringBootApplication
public class KStreamsApplication {

    @Autowired
    KStreamsService kStreamsService;

    public static void main(String[] args) {
        SpringApplication.run(KStreamsApplication.class, args);
    }

    private void run() {

        kStreamsService.process();

    }
}

我在这里做错了吗?

标签: apache-kafkaapache-kafka-streamsspring-kafka

解决方案


目前尚不清楚您run()在应用程序中的何处调用该方法。

但是,调用stream.print()该服务为时已晚,因为此时流已经启动。

这对我有用...

@SpringBootApplication
public class KStreamsApplication {

    @Autowired
    KStreamsService kStreamsService;

    public static void main(String[] args) {
        SpringApplication.run(KStreamsApplication.class, args);
    }

    @Bean
    public ApplicationRunner runner(StreamsBuilderFactoryBean fb) {
        fb.setAutoStartup(false);
        return args -> {
            run();
            fb.start();
        };
    }

    private void run() {

        kStreamsService.process();

    }

}

[KSTREAM-SOURCE-0000000000]: null, foo

推荐阅读