首页 > 解决方案 > Confluent Kafka Close() 方法提交偏移量

问题描述

我使用 Confluent Kafka .NET 库。如果要遵循IConsumer Close()方法的文档:

提交偏移量,提醒组协调器消费者正在退出组,然后释放该消费者使用的所有资源。您应该调用 Close 而不是 Dispose (或之前)以确保及时的消费者组重新平衡

在我的情况下,autocommit设置为false. 如果我无法处理消息,我只想让应用程序崩溃,并且我不想提交偏移量。

我的问题是:

如果我要Close()在退出应用程序时使用方法(这显然对您的消费者能够重新平衡很重要)是否会在autocommit设置为时提交偏移量false?还是仅在autocommit设置为时才有效true

标签: c#apache-kafkaconfluent-platform

解决方案


您可以查看源代码中的Close()函数:

 public void Close()
    {
        **// commits offsets and unsubscribes.**
        kafkaHandle.ConsumerClose();
        if (this.handlerException != null)
        {
            var ex = this.handlerException;
            this.handlerException = null;
            throw ex;
        }

        Dispose(true);
        GC.SuppressFinalize(this);
    }

因此,对 kafkaHandle.ConsumerClose() 和 Dispose(true) 的 Close() 函数调用: SafeKafkaHandle::kafkaHandle.ConsumerClose():

internal void ConsumerClose()
    {
        ThrowIfHandleClosed();
        
        ErrorCode err = **Librdkafka.consumer_close(handle);**
        if (err != ErrorCode.NoError)
        {
            throw new KafkaException(CreatePossiblyFatalError(err, null));
        }
    }

Librdkafka.consumer_close

* @remark This call will block until the consumer has revoked its assignment,
      calling the \c rebalance_cb if it is configured, committed offsets
      to broker, and left the consumer group.
      The maximum blocking time is roughly limited to session.timeout.ms.

所以,是的。即使您配置为 autoOffset=false,也要调用 Close() 函数来提交。[也许我在特殊情况下错了,有这个标志]

Dispose() 呢?我们去SafeKafkaHandle的祖父基地:SafeHandleZeroIsInvalid:SafeHandle。SafeHandle 是Microsoft 类,所以那里不可能有一些 kafka 句柄。

所以,看起来“正确的方法”是在消费者析构函数中调用 Close(),但是如果你想退出而不提交,只调用 Dispose()。

祝你好运


推荐阅读