c# - Confluent Kafka Close() 方法提交偏移量
问题描述
我使用 Confluent Kafka .NET 库。如果要遵循IConsumer
Close()
方法的文档:
提交偏移量,提醒组协调器消费者正在退出组,然后释放该消费者使用的所有资源。您应该调用 Close 而不是 Dispose (或之前)以确保及时的消费者组重新平衡
在我的情况下,autocommit
设置为false
. 如果我无法处理消息,我只想让应用程序崩溃,并且我不想提交偏移量。
我的问题是:
如果我要Close()
在退出应用程序时使用方法(这显然对您的消费者能够重新平衡很重要)是否会在autocommit
设置为时提交偏移量false
?还是仅在autocommit
设置为时才有效true
?
解决方案
您可以查看源代码中的Close()函数:
public void Close()
{
**// commits offsets and unsubscribes.**
kafkaHandle.ConsumerClose();
if (this.handlerException != null)
{
var ex = this.handlerException;
this.handlerException = null;
throw ex;
}
Dispose(true);
GC.SuppressFinalize(this);
}
因此,对 kafkaHandle.ConsumerClose() 和 Dispose(true) 的 Close() 函数调用: SafeKafkaHandle::kafkaHandle.ConsumerClose():
internal void ConsumerClose()
{
ThrowIfHandleClosed();
ErrorCode err = **Librdkafka.consumer_close(handle);**
if (err != ErrorCode.NoError)
{
throw new KafkaException(CreatePossiblyFatalError(err, null));
}
}
* @remark This call will block until the consumer has revoked its assignment,
calling the \c rebalance_cb if it is configured, committed offsets
to broker, and left the consumer group.
The maximum blocking time is roughly limited to session.timeout.ms.
所以,是的。即使您配置为 autoOffset=false,也要调用 Close() 函数来提交。[也许我在特殊情况下错了,有这个标志]
Dispose() 呢?我们去SafeKafkaHandle的祖父基地:SafeHandleZeroIsInvalid:SafeHandle。SafeHandle 是Microsoft 类,所以那里不可能有一些 kafka 句柄。
所以,看起来“正确的方法”是在消费者析构函数中调用 Close(),但是如果你想退出而不提交,只调用 Dispose()。
祝你好运
推荐阅读
- pyomo - index_set() 中有什么不同的东西
? - c++ - 为什么我的程序中存在内存泄漏?
- xml-parsing - 如何学习 SPML
- flutter - 在 Android 中为 Flutter 使用远程仓库会产生编译错误
- c# - 如何使用 3party Criipto 交换令牌 Owin OpenID 的代码
- python - 如何循环通过 CSV 读取并根据一列提取排名?
- proxy - 阻止 Internet 连接的 OPNsense 规则不适用于 www.google.com
- r - R中的异常值截止
- c# - 修改映像以使用除 root [.net core] 以外的用户运行
- php - Formstack GET 提交数组