首页 > 解决方案 > KTable在Spring Boot应用程序中没有返回数据,但是可以查询

问题描述

我有一个使用 Kafka Streams 的 Spring Boot 应用程序。我有一个带有一些金融货币报价的 KTable,它是这样创建的:

@Bean(name = "indicativeQuotes")
public KTable<String, Quote> quoteKTable(StreamsBuilder streamsBuilder) {
    return streamsBuilder.table(quoteTopicName,
            Materialized.<String,Quote,KeyValueStore<Bytes,byte[]>>as("quoteTable")
                    .withKeySerde(Serdes.String())
                    .withValueSerde(new JsonSerde<>(Quote.class)));
}

我在另一个组件中 @Autowire 这个 bean,并使用以下代码对其进行测试:

@Autowired
private KTable<String, Quote> indicativeQuotes;

@PostConstruct
private void postConstruct() {
    doPrint();
}

public void doPrint() {
        ReadOnlyKeyValueStore<String, Quote> store = streamsBuilderFactoryBean.getKafkaStreams().store("quoteTable", QueryableStoreTypes.keyValueStore());
        store.all().forEachRemaining(keyValue -> log.info("Key: " + keyValue.key + " Value: " + keyValue.value));
        indicativeQuotes.foreach((k,v) -> log.info(k));}

该代码在通过 store 查询时记录了正确的值,但它在 foreach() 中没有输出任何内容,就好像表为空一样。我也尝试过 print() 和其他选项 - 没有任何异常都没有输出。

我开始认为我不能像那样注入 KTable bean,但是关于 kafka 流主题的 Spring 文档非常稀缺,我找不到好的例子。任何帮助将不胜感激。

更新。

我的用例是我有一个预定的 Quartz 作业,它应该在触发时将 KTable 的当前状态写入 Kafka 主题,如下所示:

@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
    TriggerKey triggerKey = jobExecutionContext.getTrigger().getKey();
    log.info("Job was triggered by: {}", triggerKey.getName());

    indicativeQuotes.filter((key, value) -> key.equals(triggerKey.getName()))
            .mapValues(quoteToCourseFixedMapper)
            .toStream()
            .peek((instrument, course)-> log.info("Sending courses for instrument: {}, {}", instrument, course))
            .to(quoteEventTopicName);
}

但我认为这段代码不起作用,因为它不是处理拓扑的一部分,我不能按需从 Ktable 中获取数据。我在这里有点疑惑,当然我可以在事件触发时通过 store 查询数据,但是对于这种用例,也许有更好的模式?基本上我很感兴趣是否可以将此触发的作业事件合并为处理管道的一部分。

标签: springspring-bootapache-kafka-streamsspring-kafka

解决方案


如果您只想将更新发布到另一个主题,请将 KTable 转换为 KStream 并使用 to() 函数。

KTable ktable = ...;
KStream ksteram = ktable.toStream();
kstream.to("topic", Produces.with(keySerde, valueSerde))

该主题将包含该表的更改日志。

显然因为一些生命周期相关的概念,你不能只注入(@autowire)KStream/KTable。您应该尽可能保持与 KafkaStreams 相关的代码类型。

因此,在您希望在某个“随机”时间对表的当前状态执行某些操作的特定情况下,您必须查询存储(表)。所以搜索 kafka steams 交互式查询。请记住,您需要从应用程序的所有实例中获取数据(如果您有多个实例。或者您可以使用全局存储。它需要一两天的搜索时间。


推荐阅读