c# - Confluent-kafka-dotnet - 在互联网连接丢失几秒钟后,消费者在提交偏移量时挂起
问题描述
描述
我正在使用这个 kafka 客户端 - https://github.com/confluentinc/confluent-kafka-dotnet 在互联网连接丢失几秒钟后提交偏移量时,消费者会挂起。
如何重现
我有正在运行的消费者 - 一切都很好。但是,如果我关闭 Wifi 并在我的笔记本电脑上再次打开(我本地机器上的消费者 - 但生产相同) - 它会消耗第一个消息 - 并挂在提交 - 结果 - 僵尸消费者线程。我尝试在具有超时的单独线程中运行 Commit - 在超时的情况下 - 关闭当前消费者 - 并重新创建它 - 但它挂在 Close() 方法上(相同 - 使用 Unassign 和 Unsubscribe 和 Dispose 方法)
唯一可行的方法-我在带有超时的单独线程中运行 Commit()-在超时的情况下-只需重新创建另一个使用者循环(线程)-无需调用 Close()、Unassign、Unsubscribe、Dispose 方法。但是每次我做这个技巧时线程的数量都会增加 - 可能是因为死锁的线程。
也许类似 - https://github.com/dpkp/kafka-python/issues/1728 而这个 - https://github.com/confluentinc/confluent-kafka-dotnet/issues/1552
[ 1.6.3(但 1.5.3 - 相同)] Confluent.Kafka nuget 版本。[Windows 10 家庭版] 操作系统。
消费者配置:
var config = new ConsumerConfig
{
BootstrapServers = _settings.BootstrapServers,
GroupId = consumerGroupName,
EnableAutoCommit = false,
AutoOffsetReset = AutoOffsetReset.Earliest,
SaslUsername = _settings.SaslUsername,
SaslPassword = _settings.SaslPassword,
SecurityProtocol = KafkaProducer.SecurityProtocol,
SaslMechanism = KafkaProducer.SaslMechanism,
AllowAutoCreateTopics = true,
Debug = "consumer,cgrp,topic,fetch"
};
日志: 这里一切正常:
.............
[thrd:sasl_plaintext://94.131.241.251:9094/bootstrap]:sasl_plaintext://94.131.241.251:9094/2:在偏移量 25 (v4) 处获取主题 vad-UpdateFrameworkFromImportCommand [1] [thrd:sasl_plaintext://94.131 .241.251:9094/bootstrap]:sasl_plaintext://94.131.241.251:9094/2:在偏移量 0 (v4) [thrd:sasl_plaintext://94.131.241.251:9094/bootstrap] 处获取主题 vad-ImportFrameworksConsumerRetry [0]: sasl_plaintext://94.131.241.251:9094/2:获取 2/2/2 toppar(s)
Wifi 已关闭:
[thrd:sasl_plaintext://94.131.241.251:9094/bootstrap]:sasl_plaintext://94.131.241.251:9094/2:断开连接(在 18793 毫秒后处于 UP 状态)[thrd:sasl_plaintext://94.131.241.251:9094/bootstrap ]:sasl_plaintext://94.131.241.251:9094/2:获取回退 500 毫秒:本地:代理传输失败 [thrd:sasl_plaintext://94.131.241.251:9093/bootstrap]:sasl_plaintext://94.131.241.251:9093/ 1:断开连接(在 18781 毫秒后处于 UP 状态)[thrd:sasl_plaintext://94.131.241.251:9093/bootstrap]:sasl_plaintext://94.131.241.251:9093/1:获取回退 500 毫秒:本地:代理传输失败 [thrd :sasl_plaintext://94.131.241.251:9095/bootstrap]: sasl_plaintext://94.131.241.251:9095/3: 断开连接(在 24894 毫秒后处于 UP 状态)[thrd:sasl_plaintext://94.131.241.251:9095/bootstrap]: sasl_plaintext://94.131.241.251:9095/3:获取 500 毫秒的退避:本地:代理传输失败 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理关闭:重新查询 [thrd:main] :主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1] :代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [thrd:GroupCoordinator]:GroupCoordinator:94.131.241.251:9093:已断开连接(在 24050 毫秒后处于 UP 状态) [thrd:main]: Group "ImportFrameworksConsumer" 改变了状态 -> query-coord (join-state stable) [thrd:main]: Group "ImportFrameworksConsumer": 没有可用于协调器查询的代理:在状态查询坐标 sasl_plaintext://94.131.241.251:9094/2 中间隔:断开连接(在状态 UP 18793 毫秒后)Confluent.Kafka.Consumer2[System.String,System.String] sasl_plaintext://94.131.241.251:9093/ 1:断开连接(在状态 UP 18781 毫秒后)Confluent.Kafka.Consumer2[System.String,System.String] sasl_plaintext://94.131.241.251:9095/3:断开连接(在状态 UP 24894 毫秒后)Confluent.Kafka.Consumer2[ System.String,System.String] GroupCoordinator: 94.131.241.251:9093: 断开连接(在 24050 毫秒后处于 UP 状态) Confluent.Kafka.Consumer2[System.String,System.String] 4/4 代理关闭 Confluent.Kafka.Consumer2[ System.String,System.String] [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询-coord [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有代理可用于协调器查询:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有代理可用于协调器查询:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理已关闭: 重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [ thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd: main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态间隔查询-coord [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1] :代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询[thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [thrd:main]:组“ ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在state query-coord [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理已关闭:重新查询 [thrd:main ]:主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1 ]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有代理可用于协调器查询:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd :main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [thrd:main]:组“ImportFrameworksConsumer”:没有可用的代理对于协调器查询:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器的代理查询:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [ 1]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询抛出异常:System.dll 中的“System.Net.Sockets.SocketException”抛出异常: System.dll 中的“System.ObjectDisposedException” 引发的异常:System.dll 中的“System.Net.WebException” 引发的异常:System.dll 中的“System.Net.WebException” 引发的异常:System.dll 中的“System.ObjectDisposedException”异常抛出:System.dll 中的“System.Net.WebException”异常抛出:Elasticsearch.Net 中的“Elasticsearch.Net.ElasticsearchClientException”。dll [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔[ thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd: main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [ 2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [thrd:main] :组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标[thrd:main]中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标[thrd:main]中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标 [thrd:main] 中间隔:主题 vad- ImportFrameworksConsumerRetry [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [ thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [thrd:main]:组“ImportFrameworksConsumer “:没有可用于协调器查询的代理:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态间隔查询-坐标 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [ thrd:main]:消费者组会话在 10184 毫秒后超时(加入状态稳定),没有来自组协调器的成功响应(代理 1,最后一个错误是成功):撤销分配并重新加入组 [thrd:sasl_plaintext:// 94.131.241.251:9094/bootstrap]:sasl_plaintext://94.131.241.251:9094/2:连接到 ipv4#94.131.241.251:9094 失败:未知错误(在 CONNECT 状态下 21036 毫秒后)sasl_plaintext://94.131.241.251:9 /2:连接到 ipv4#94.131.241.251:9094 失败:未知错误(在 CONNECT 状态下 21036 毫秒后)Confluent.Kafka.Consumer2 [System.String,System.String] [thrd:sasl_plaintext://94.131.241.251:9095/bootstrap]: sasl_plaintext://94.131.241.251:9095/3:连接到 ipv4#94.131.241.251:9095 失败:未知错误(在状态 CONNECT 21040 毫秒后)[thrd:sasl_plaintext://94.131.241.251:9093/bootstrap]:sasl_plaintext: //94.131.241.251:9093/1:连接到 ipv4#94.131.241.251:9093 失败:未知错误(在 CONNECT 状态下 21041 毫秒后) sasl_plaintext://94.131.241.251:9095/3:连接到 ipv4#94.131.241.251: 9095 failed: Unknown error (after 21040ms in state CONNECT) Confluent.Kafka.Consumer2[System.String,System.String] sasl_plaintext://94.131.241.251:9093/1: Connect to ipv4#94.131.241.251:9093 failed: Unknown错误(在 CONNECT 状态下 21041 毫秒后)Confluent.Kafka。Consumer2 [System.String,System.String] [thrd:GroupCoordinator]:GroupCoordinator:94.131.241.251:9093:连接到 ipv4#94.131.241.251:9093 失败:未知错误(在 CONNECT 状态 21054 毫秒后)GroupCoordinator:94.131.241.251: 9093:连接到 ipv4#94.131.241.251:9093 失败:未知错误(在 CONNECT 状态 21054 毫秒后)Confluent.Kafka.Consumer`2[System.String,System.String]
Wifi 开启:
[thrd:sasl_plaintext://94.131.241.251:9095/bootstrap]: sasl_plaintext://94.131.241.251:9095/3:在偏移量 0 (v4) 处获取主题 vad-ImportFrameworksConsumerRetry [1] [thrd:sasl_plaintext://94.131 .241.251:9095/bootstrap]:sasl_plaintext://94.131.241.251:9095/3:在偏移量 18 (v4) [thrd:sasl_plaintext://94.131.241.251:9095/bootstrap] 处获取主题 vad-UpdateFrameworkFromImportCommand [2]: sasl_plaintext://94.131.241.251:9095/3:获取 2/2/2 toppar(s) [thrd:sasl_plaintext://94.131.241.251:9093/bootstrap]:sasl_plaintext://94.131.241.251:9093/1:在偏移量 9 (v4) [thrd:sasl_plaintext://94.131.241.251:9093/bootstrap] 处获取主题 vad-UpdateFrameworkFromImportCommand [0]:sasl_plaintext://94.131.241.251:9093/1:获取主题 vad-ImportFrameworksConsumerRetry [2]在偏移量 0 (v4) [thrd:sasl_plaintext://94.131.241.251:9093/bootstrap]: sasl_plaintext://94.131.241.251:9093/1:获取 2/2/2 个顶杆
解决方案
推荐阅读
- doit - 并行运行时等待依赖
- javascript - 从头开始创建 toUpperCase 函数时的问题
- c - c中的sizeof运算符对相同类型的指针在C中给出不同的结果
- kubernetes - Openshift vs Rancher,有什么区别?
- php - 指定为定义者的用户('mysql.infoschema'@'localhost')不存在 - 第一个 Laravel 项目
- c# - 插入 URL 的变量未显示在地址栏中
- python-3.x - Python 程序 ValueError:以 10 为底的 int() 的无效文字:
- php - 发布方法不适用于 IIS 重写规则
- flutter - 来自 FutureBuilder 的 Navigator.pop
- javascript - 克隆 chrome CTRL + F 实现