java - 迁移到 Storm 2.2.0,Kafka 偏移问题
问题描述
我正在从 Storm 0.9.x 迁移到 Storm 2.2.0。遵循 Kafka spout 的代码:
KafkaTridentSpoutConfig.Builder kafkaSpoutConfigBuilder = KafkaTridentSpoutConfig.builder(bootstrapServers, topic);
kafkaSpoutConfigBuilder.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, fetchSizeBytes);
kafkaSpoutConfigBuilder.setProp(ConsumerConfig.GROUP_ID_CONFIG, clientId);
kafkaSpoutConfigBuilder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
kafkaSpoutConfigBuilder.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaTridentSpoutOpaque(kafkaSpoutConfigBuilder.build());
但是每次我重新启动 Storm Local Cluster 时,都会从头开始读取消息。如果我直接在 Kafka 中检查特定组的偏移量,则没有滞后。就像不读取来自 Kafka 的偏移量一样。使用 Kafka 2.8、Storm 2.2.0。Storm 0.9.X 没有这个问题。任何想法?
谢谢!
解决方案
推荐阅读
- php - Stripe API 通过 API 更新账号
- php - 来自 PDO 命令的开发控制台的 Postgres PHP 错误
- python - 我有更好的 Python 文档吗?更有条理?
- linux - 不知道如何运行 Electron Linux 构建
- android - 在 Kotlin 上的 RecyclerView 中保存 EditText 内容
- r - 如何从R中的栅格堆栈中提取时间序列
- android-camera - 光学纵横比与supportedPictureSizes 不一致
- python - args 和 *args 作为函数输入的区别
- freeradius - 在半径回复中返回自定义用户属性
- c++ - 为什么这个没有参数的构造函数似乎是这段代码的问题?