首页 > 解决方案 > Kafka Streams 中的状态过滤/flatMapValues?

问题描述

我正在尝试编写一个简单的 Kafka Streams 应用程序(针对 Kafka 2.2/Confluent 5.2),以将具有至少一次语义的输入主题转换为精确一次的输出流。我想编码以下逻辑:

(这可以保证根据我们从上游系统获得的排序保证提供正确的结果;我不想在这里做任何神奇的事情。)

起初我以为我可以使用Kafka StreamsflatMapValues运算符来做到这一点,它可以让您将每个输入消息映射到具有相同键的零个或多个输出消息。但是,该文档明确警告:

这是一个无状态的逐条记录操作(参见 transformValues(ValueTransformerSupplier, String...) 用于有状态的值转换)。

这听起来很有希望,但transformValues文档并没有明确说明如何为每个输入消息发出零个或一个输出消息。除非这就是// or null示例中的旁白想要表达的意思吗?

flatTransform看起来也很有希望,但我不需要操纵密钥,如果可能的话,我想避免重新分区。

任何人都知道如何正确执行这种过滤?

标签: apache-kafkaapache-kafka-streams

解决方案


您可以Transformer用于实现如上所述的有状态操作。为了不向下游传播消息,您需要null从方法返回,这在java doctransform中提到。Transformer您可以通过processorContext.forward(key, value). 下面提供的简化示例

kStream.transform(() -> new DemoTransformer(stateStoreName), stateStoreName)

public class DemoTransformer implements Transformer<String, String, KeyValue<String, String>> {
    private ProcessorContext processorContext;
    private String stateStoreName;
    private KeyValueStore<String, String> keyValueStore;

    public DemoTransformer(String stateStoreName) {
        this.stateStoreName = stateStoreName;
    }

    @Override
    public void init(ProcessorContext processorContext) {
        this.processorContext = processorContext;
        this.keyValueStore = (KeyValueStore) processorContext.getStateStore(stateStoreName);
    }

    @Override
    public KeyValue<String, String> transform(String key, String value) {
        String existingValue = keyValueStore.get(key);
        if (/* your condition */) {
            processorContext.forward(key, value);
            keyValueStore.put(key, value);
        }

        return null;
    }

    @Override
    public void close() {
    }
}

推荐阅读