首页 > 解决方案 > 从转换中的数据更新全局存储

问题描述

我目前有一个简单的拓扑:

    KStream<String, Event> eventsStream = builder.stream(sourceTopic);
    eventsStream.transformValues(processorSupplier, "nameCache")
        .to(destinationTopic);

我的事件有时有一个键/值对,而其他时候只有键。我希望能够将值添加到那些缺少值的事件中。我可以在本地状态存储中正常工作,但是当我添加更多任务时,有时键/值事件和值事件位于不同的线程中,因此它们没有正确更新。

我想为此使用全局状态存储,但我很难弄清楚当新的键/值对进入时如何更新全局存储。我使用以下代码创建了一个全局状态存储:

    builder.addGlobalStore(stateStore, "global_store", Consumed.with(Serdes.String(), Serdes.String()), new ProcessorSupplier<String, String>() {
      @Override
      public Processor<String, String> get() {
        return new Processor<String, String>() {
          private ProcessorContext context;
          @Override
          public void init(final ProcessorContext processorContext) {
            this.context = processorContext;
          }

          @Override
          public void process(final String key, final String value) {
            context.forward(key, value);
          }

          @Override
          public void close() {
          }
        };
      }
    });

据我所知,它正在工作,但由于该主题中没有数据,我不确定。

所以我的问题是如何从 transformValues 内部更新全局存储? store.put()失败并显示全局存储为只读错误。

在 Kafka Streams 上找到了 Write to GlobalStateStore,但接受的答案只是说要更新基础主题,但我不知道该怎么做,因为该主题不在我的流中。

---已编辑---

我在接受的答案中更新了 #1 的代码。我看到新的键/值对出现在global_store. 但是 globalStore 似乎没有看到新键。如果我重新启动应用程序,它会用主题中的数据填充缓存,但在我停止/启动应用程序之前,新键不可见。

process(String, String)在全局存储处理器中添加了日志记录,它显示正在处理的新密钥。有任何想法吗?

标签: apache-kafka-streams

解决方案


  1. 你只能在transformValues中获得对全局状态存储的真正访问权限,如果你想更新一个全局状态存储,是的,你必须将更新发送到全局状态存储的底层输入主题,你的状态会更新消费此更新消息时的值。这背后的原因是,全局状态存储在所有应用程序实例上填充,并使用此输入主题进行容错。您可以通过对拓扑进行分支来做到这一点:
KStream<String, Event> eventsStream = builder.stream(sourceTopic);
//processing message as normal
eventsStream.transformValues(processorSupplier, "nameCache")
        .to(destinationTopic);

//this transform to the updated message to global state
eventsStream.transform(updateGlobalStateProcessorSupplier, "nameCache")
        .to("global_store");
  1. 使用低级 API手动构建拓扑,因此您可以使用接收器处理器的名称将消息转发到destinationTopic主题和global_state主题,ProcessorContext.forward以将消息转发到接收器处理器节点。

推荐阅读