java - 在 Kafka 中通过重放进行精确一次处理
问题描述
我正在使用 Kafka 进行事件日志/处理。我正在寻找(尽可能接近)一次处理,同时在分区(重新)分配期间支持“重播”,通知重播的事件处理程序以便它可以重建它的状态
这是我的代码:
private final KafkaConsumer<String, String> consumer;
private final KafkaProducer<String, String> producer;
private final BiFunction<String, Boolean, String> eventHandler;
private final long[] startingCommitOffsets;
public void onParitionsAssigned(Collection<TopicPartition> partitions) {
partitions.forEach(p -> startingCommitOffsets[p.partition()] = consumer.position(p));
consumer.seekToBeginning(partitions);
}
public void run() {
while (true) {
var records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
var commitRecords = new HashMap<TopicPartition, OffsetAndMetadata>();
producer.beginTransation();
records.forEach(r -> {
var isReplay = r.offset() < startingCommitOffsets[r.partition()];
var resultEvent = eventHandler.apply(r.value(), isReplay);
producer.send(new ProducerRecord<>(r.topic(), r.key(), resultEvent));
if (!isReplay) {
commitRecords.put(new TopicPartition(r.topic(), r.partition(), new OffsetAndMetadata(r.offset()));
}
});
producer.commitTransaction();
if (!commitRecords.isEmpty()) {
consumer.commitSync(commitRecords);
}
}
}
我的问题:
- 分配分区后,我保存当前位置并寻找到开头。这不会改变承诺的立场吗?(文档不清楚)
product.commitTransaction()
并且consumer.commitSync()
是两个独立的操作。如果后者失败,我们将已经提交了一些新事件,这些新事件将在下次处理事件时重复 - 有没有办法将它们组合成一个操作?
解决方案
分配分区后,我保存当前位置并寻找到开头。这不会改变承诺的立场吗?
commitAsync() or commitSync()
在您明确调用或之前,承诺的位置不会改变auto.commit.enable=true
producer.commitTransaction()
并且consumer.commitSync()
是两个独立的操作。如果后者失败,我们将已经提交了一些新事件,这些新事件将在下次处理事件时复制。有没有办法将它们组合成一个操作?
producer.sendOffsetsToTransaction()
此方法可能是您正在寻找的方法,以实现仅一次处理。
从文档中:
将指定偏移量列表发送给消费者组协调器,并将这些偏移量标记为当前事务的一部分。仅当事务成功提交时,这些偏移量才会被视为已提交。提交的偏移量应该是您的应用程序将使用的下一条消息,即
lastProcessedMessageOffset+1
.
更重要的是,
请注意,消费者应该拥有
enable.auto.commit=false
并且也不应该手动提交偏移量(通过同步或异步提交)。
您可以推断出您将得到的TopicPartition
和 偏移量。ConsumerRecord
poll()
只需将它们 (new TopicPartition(record.topic(), record.partition())
和new OffsetAndMetadata(record.offset())
) 存储在地图中,并在您想要提交时传递它。
下面的代码片段可以让你有个想法(参考):
KafkaProducer producer = createKafkaProducer(
“bootstrap.servers”, “localhost:9092”,
“transactional.id”, “my-transactional-id”);
producer.initTransactions();
KafkaConsumer consumer = createKafkaConsumer(
“bootstrap.servers”, “localhost:9092”,
“group.id”, “my-group-id”,
"isolation.level", "read_committed");
consumer.subscribe(singleton(“inputTopic”));
while (true) {
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
producer.beginTransaction();
Map<TopicPartition, OffsetAndMetadata> map = new LinkedHashMap<>();
for (ConsumerRecord record : records) {
producer.send(producerRecord(“outputTopic”, record));
map.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()));
}
producer.sendOffsetsToTransaction(offsetMap, group);
producer.commitTransaction();
}
发送偏移量后,我们提交它们。
推荐阅读
- java - 如何从firebase数据库中获取一个孩子
- java - Intellij上“clean”和“clean tomcat工作目录”的等效操作是什么?
- amazon-ecs - 阶梯函数 - sqs - ecs
- c# - 方法不断将相同的东西添加到列表中
- c# - 您将如何在删除其余数字的同时提取一组数字中的第一个数字?
- google-cloud-platform - Google Speech to Text 最佳值
- kendo-ui - Kendo UI 无法绑定 ViewModel
- python - 虽然循环没有以“break”退出
- javascript - JavaScript解释器如何将全局语句添加到事件队列中?
- python - 在捕获并立即引发异常时避免“在处理上述异常期间,发生了另一个异常”