首页 > 解决方案 > 设置的属性在 Kafka Streams 中不起作用

问题描述

我对 Kafka Streams 中的属性有疑问。例如,如果我在拓扑节点中设置此属性:

        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "window-exp-stream");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,KafkaConstants.KAFKA_BROKERS);
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,EventTimeExtractor.class);
builder = new StreamsBuilder();
..........
..........
stream = new KafkaStreams(topology,config);

时间戳提取器不起作用。如果我将此属性用于状态存储,它也不起作用:

Map<String, String> changeLogConfigs = new HashMap<String, String>();
        changeLogConfigs.put("retention.ms","600000" ); //10minuti
        storeBuilder.withLoggingEnabled(changeLogConfigs);
        builder.addStateStore(storeBuilder);

这是什么原因?我应该卸载 Kafka 和 Zookeeper 并重新安装它们吗?

标签: javaapache-kafkaapache-kafka-streams

解决方案


推荐阅读