首页 > 解决方案 > 迁移到 Storm 2.2.0,Kafka 偏移问题

问题描述

我正在从 Storm 0.9.x 迁移到 Storm 2.2.0。遵循 Kafka spout 的代码:

    KafkaTridentSpoutConfig.Builder kafkaSpoutConfigBuilder = KafkaTridentSpoutConfig.builder(bootstrapServers, topic);

kafkaSpoutConfigBuilder.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, fetchSizeBytes);
kafkaSpoutConfigBuilder.setProp(ConsumerConfig.GROUP_ID_CONFIG, clientId);
kafkaSpoutConfigBuilder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
kafkaSpoutConfigBuilder.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

return new KafkaTridentSpoutOpaque(kafkaSpoutConfigBuilder.build());

但是每次我重新启动 Storm Local Cluster 时,都会从头开始读取消息。如果我直接在 Kafka 中检查特定组的偏移量,则没有滞后。就像不读取来自 Kafka 的偏移量一样。使用 Kafka 2.8、Storm 2.2.0。Storm 0.9.X 没有这个问题。任何想法?

谢谢!

标签: javaapache-kafkaapache-storm

解决方案


推荐阅读