首页 > 解决方案 > Kafka Streams:实现一个简单的 KeyValueStore,我可以在其中放置和获取数据

问题描述

我有一个 Kafka 流应用程序,它对传入状态进行操作,需要在写入下一个主题之前存储状态。只有在本地存储中更新状态后才应进行写入。

像这样的东西。

stream.map(this::getAndUpdateState)
          .map(this::processStateAndEvent)
          .to("topicname");

所以getAndUpdateState()我可以这样做

state = store.get(key); // or new if null
state = updateState(state, event);  // update changes to state
store.put(key, state);  // write back the state
return state;

如何在 kafka 商店中实现简单的 get() 和 put() 操作?我已经尝试过使用KeyValueStore但它有问题,因为我必须将它添加一个源和接收器处理器等等。

或者,使用 KTable 或其他概念获取和放入 kafka 的方法也可以。

标签: javaapache-kafkaapache-kafka-streams

解决方案


感谢user152468Matthias J. Sax的建议。

我能够使用transform()方法在 kafka 流中进行有状态处理。下面给出了基于原始管道示例的完整工作代码。

管道.java:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class Pipe{
    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();

        //  setting Configs
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // initializing  a streambuilder for building topology.
        final StreamsBuilder builder = new StreamsBuilder();
        // creating a KStream that is continuously generating records from its source kafka topic "streams-plaintext-output"
        KStream<String, String> source = builder.stream("streams-plaintext-input");

        StoreBuilder<KeyValueStore<String, Long>> wordCountsStore = Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore("WordCountsStore"),
                Serdes.String(),
                Serdes.Long())
                .withCachingEnabled();

        builder.addStateStore(wordCountsStore);

        source.map((k, v) -> KeyValue.pair("key", v))
                .peek((k, s) -> System.out.printf("After keying: %s, value: %s\n", k, s))
                .transform(new SampleTransformSupplier(wordCountsStore.name()), wordCountsStore.name())
                .peek((k, s) -> System.out.printf("After transform: %s, value: %s\n", k, s));
        // writing this source to another kafka topic "streams-pipe-output"
        source.to("streams-pipe-output");
        // generating the topology
        final Topology topology = builder.build();
        System.out.print(topology.describe());

        // constructing a streams client with the properties and topology
        final KafkaStreams streams = new KafkaStreams(topology, properties);
        final CountDownLatch latch = new CountDownLatch(1);

        // attaching shutdown handler
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run(){
                streams.close();
                latch.countDown();
            }
        });
        try{
            streams.start();
            latch.await();
        } catch (Throwable e){
            System.exit(1);
        }
        System.exit(0);
    }

    private static class SampleTransformSupplier implements TransformerSupplier<String, String, KeyValue<String, String>> {

        final private String stateStoreName;

        public SampleTransformSupplier(String stateStoreName) {
            this.stateStoreName = stateStoreName;
        }

        @Override
        public Transformer<String, String, KeyValue<String, String>> get() {
            return new Transformer<String, String, KeyValue<String, String>>() {


                private KeyValueStore<String, Long> stateStore;

                @SuppressWarnings("unchecked")
                @Override
                public void init(ProcessorContext processorContext) {
                    stateStore = (KeyValueStore<String, Long>) processorContext.getStateStore(stateStoreName);
                }

                @Override
                public KeyValue<String, String> transform(String key, String value) {
                    Long countSoFar = stateStore.get(key);
                    if(countSoFar == null){
                        System.out.print("Initializing count so far. this message should be printed only once");
                        countSoFar = 0L;
                    }
                    countSoFar += value.length();
                    System.out.printf(" Key: %s, Value: %s, Count: %d\n\n", key, value, countSoFar);
                    stateStore.put(key, countSoFar);
                    return KeyValue.pair(key, value);
                }

                @Override
                public void close() {
                    // No need to close as this is handled by kafka.
                }
            };
        }
    }
}

推荐阅读