java - Java Kafka消费者不会从它离开的地方恢复
问题描述
我有生产者线程将数据推送到 Kafka 主题中,一个消费者线程一次轮询每个项目。这是消费者配置:
private final KafkaConsumer<Long, Integer> consumer;
public static final String TOPIC = "requestsss";
{
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC));
}
消费者循环运行以下代码:
ConsumerRecords<Long, Integer> consumerRecords = consumer.poll(Duration.ofMillis(10000));
consumer.commitSync();
return consumerRecords.records(TOPIC).iterator().next().value();
然后消费者进程在没有消费所有产生的消息的情况下停止。问题是当它重新启动时,它不会从它离开的地方恢复进程。相反,它就好像所有消息都在前一次运行中被消耗掉了。
有谁知道我做错了什么?
注意:我也尝试过设置
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
既是真又是假,也去除了consumer.commitSync()
,但都没有产生预期的效果。
谢谢您的帮助
解决方案
推荐阅读
- python - python合并两个数据框并保留两者的值
- netbeans - Netbeans 中的 Polymer3 HTML 格式
- node.js - 在MongoDB中返回数据之前有条件地添加过滤器困境
- python - xarray 的导入错误返回“导入 defs 时 DLL 加载失败”
- javascript - loading="lazy" 但对于 html5 视频
- c# - 非静态字段、方法或属性需要对象引用 (Unity C#)
- c# - Base64 字符串编码的图像未加载到输出 PDF (TuesPechkin / Wkhtmltopdf)
- angular - Angular HttpClient 服务仅返回状态码为 200 的响应
- c# - EF Core 删除对自身的引用而不删除
- confluence - 将用户宏转换为 Connect 或 Forge