java - 设置的属性在 Kafka Streams 中不起作用
问题描述
我对 Kafka Streams 中的属性有疑问。例如,如果我在拓扑节点中设置此属性:
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "window-exp-stream");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,KafkaConstants.KAFKA_BROKERS);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,EventTimeExtractor.class);
builder = new StreamsBuilder();
..........
..........
stream = new KafkaStreams(topology,config);
时间戳提取器不起作用。如果我将此属性用于状态存储,它也不起作用:
Map<String, String> changeLogConfigs = new HashMap<String, String>();
changeLogConfigs.put("retention.ms","600000" ); //10minuti
storeBuilder.withLoggingEnabled(changeLogConfigs);
builder.addStateStore(storeBuilder);
这是什么原因?我应该卸载 Kafka 和 Zookeeper 并重新安装它们吗?
解决方案
推荐阅读
- git - 如何在我的移动浏览器上运行 Flutter Web 应用
- javascript - 如何修复我的代码,以便它根据 MongoDB 中提交的 id 重定向页面?
- google-apps-script - 将图形资产添加到谷歌网络应用脚本的最佳途径是什么?
- java - order_inserts=true 的外键违规和混合子类实体的批次
- c# - MS Graph v1.0 个人资料图片资源未找到
- jquery - 使用 jQuery 计数器随机计数 1 或 2,并在达到 97 时使用 cookie 保存数字
- java - 如何将文件写入 Apache SSHD 服务器?
- python - Python fmin 使用 lambda 表达式
- sql - JSON 值 - Oracle PL/SQL:多个字段
- excel - if 语句不能在字符串数据上正常工作:Excel