java - 为什么 Kafka 的 seekToBeginning 和 seekToEnd 不能与 assign 一起使用?
问题描述
说,我想检查特定分区的 Kafka 中第一条和最后一条消息的偏移量。我的想法是将该assign(…)
方法与seekToBeginning(…)
and一起使用seekToEnd(…)
。不幸的是,这不起作用。
如果我设置AUTO_OFFSET_RESET_CONFIG
为"latest"
,seekToBeginning(…)
则没有效果;如果我将其设置为"earliest"
,seekToEnd(…)
则不起作用。似乎对我的消费者来说唯一重要的是AUTO_OFFSET_RESET_CONFIG
.
我看过一个类似的话题,但问题是处理的subscribe()
,而不是assign()
方法。建议的解决方案是实现ConsumerRebalanceListner
并将其作为参数传递给subscribe()
方法。不幸的是,该assign()
方法只有一个签名,并且只能获取主题分区列表。
问题是:是否可以使用orseekToBeginning()
方法。如果是,如何?如果不是,为什么?seekToEnd()
assign()
我的代码的相关片段:
KafkaConsumer<String, ProtoMeasurement> consumer = createConsumer();
TopicPartition zeroP = new TopicPartition(TOPIC, 1);
List<TopicPartition> partitions = Collections.singletonList(zeroP);
consumer.assign(partitions);
consumer.poll(Duration.ofSeconds(1));
consumer.seekToBeginning(partitions);
long currOffsetPos = consumer.position(zeroP);
LOGGER.info("Current offset {}.", currOffsetPos);
ConsumerRecords<String, ProtoMeasurement> records = consumer.poll(Duration.ofMillis(100));
// ...
记录器打印偏移量 n,这是所考虑主题的最大(最新)偏移量。
解决方案
我注意到这种行为在 MockConsumer 中有错误且不一致。文档说他们是懒惰的,但会在 position() 调用后触发。但对于 MockConsumer 来说,情况并非如此。特别是,我发现它在大约 1.0 和 2.2.2 之间适用于 MockConsumer,并且在 2.3.0 之后被破坏
取而代之的是,我选择执行以下操作,这在 MockConsumer 和真实的情况下始终有效:
// consistently working seed to beginning
consumer.beginningOffsets(partitions).forEach(consumer::seek);
// consistently working seed to end
consumer.endOffsets(partitions).forEach(consumer::seek);
如果有线程同时调用轮询,这会更危险,但在我的情况下效果很好,我只想在应用程序开始轮询时手动控制偏移位置。
推荐阅读
- c# - 时间:2019-01-02 标签:c#singleworkerthread,singleGUIthread
- mysql - 访问数据库表值并将它们用作mysql中存储过程的参数
- amazon-web-services - 在 Cognito 中查找登录用户数
- laravel - 是否可以使用来自另一个共享主机的配置发送电子邮件?
- symfony - 翻译自定义验证器消息
- java - JPA - 多对多 - 创建/更新时违反外键
- c# - 为什么文件 IFormFile = null?
- angularjs - 从前端禁用按钮
- mysql - ER_NOT_SUPPORTED_AUTH_MODE:使用 mysql 对sails.js 应用程序进行dockerize
- ios - 崩溃专用 String.imageSize(),