首页 > 解决方案 > Confluent-kafka-dotnet - 在互联网连接丢失几秒钟后,消费者在提交偏移量时挂起

问题描述

描述

我正在使用这个 kafka 客户端 - https://github.com/confluentinc/confluent-kafka-dotnet 在互联网连接丢失几秒钟后提交偏移量时,消费者会挂起。

如何重现

我有正在运行的消费者 - 一切都很好。但是,如果我关闭 Wifi 并在我的笔记本电脑上再次打开(我本地机器上的消费者 - 但生产相同) - 它会消耗第一个消息 - 并挂在提交 - 结果 - 僵尸消费者线程。我尝试在具有超时的单独线程中运行 Commit - 在超时的情况下 - 关闭当前消费者 - 并重新创建它 - 但它挂在 Close() 方法上(相同 - 使用 Unassign 和 Unsubscribe 和 Dispose 方法)

唯一可行的方法-我在带有超时的单独线程中运行 Commit()-在超时的情况下-只需重新创建另一个使用者循环(线程)-无需调用 Close()、Unassign、Unsubscribe、Dispose 方法。但是每次我做这个技巧时线程的数量都会增加 - 可能是因为死锁的线程。

也许类似 - https://github.com/dpkp/kafka-python/issues/1728 而这个 - https://github.com/confluentinc/confluent-kafka-dotnet/issues/1552

[ 1.6.3(但 1.5.3 - 相同)] Confluent.Kafka nuget 版本。[Windows 10 家庭版] 操作系统。

消费者配置:

var config = new ConsumerConfig
{
    BootstrapServers = _settings.BootstrapServers,
    GroupId = consumerGroupName,
    EnableAutoCommit = false,
    AutoOffsetReset = AutoOffsetReset.Earliest,
    SaslUsername = _settings.SaslUsername,
    SaslPassword = _settings.SaslPassword,
    SecurityProtocol = KafkaProducer.SecurityProtocol,
    SaslMechanism = KafkaProducer.SaslMechanism,
    AllowAutoCreateTopics = true,
    Debug = "consumer,cgrp,topic,fetch"
};

日志: 这里一切正常:

.............

[thrd:sasl_plaintext://94.131.241.251:9094/bootstrap]:sasl_plaintext://94.131.241.251:9094/2:在偏移量 25 (v4) 处获取主题 vad-UpdateFrameworkFromImportCommand [1] [thrd:sasl_plaintext://94.131 .241.251:9094/bootstrap]:sasl_plaintext://94.131.241.251:9094/2:在偏移量 0 (v4) [thrd:sasl_plaintext://94.131.241.251:9094/bootstrap] 处获取主题 vad-ImportFrameworksConsumerRetry [0]: sasl_plaintext://94.131.241.251:9094/2:获取 2/2/2 toppar(s)

Wifi 已关闭:

[thrd:sasl_plaintext://94.131.241.251:9094/bootstrap]:sasl_plaintext://94.131.241.251:9094/2:断开连接(在 18793 毫秒后处于 UP 状态)[thrd:sasl_plaintext://94.131.241.251:9094/bootstrap ]:sasl_plaintext://94.131.241.251:9094/2:获取回退 500 毫秒:本地:代理传输失败 [thrd:sasl_plaintext://94.131.241.251:9093/bootstrap]:sasl_plaintext://94.131.241.251:9093/ 1:断开连接(在 18781 毫秒后处于 UP 状态)[thrd:sasl_plaintext://94.131.241.251:9093/bootstrap]:sasl_plaintext://94.131.241.251:9093/1:获取回退 500 毫秒:本地:代理传输失败 [thrd :sasl_plaintext://94.131.241.251:9095/bootstrap]: sasl_plaintext://94.131.241.251:9095/3: 断开连接(在 24894 毫秒后处于 UP 状态)[thrd:sasl_plaintext://94.131.241.251:9095/bootstrap]: sasl_plaintext://94.131.241.251:9095/3:获取 500 毫秒的退避:本地:代理传输失败 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理关闭:重新查询 [thrd:main] :主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1] :代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [thrd:GroupCoordinator]:GroupCoordinator:94.131.241.251:9093:已断开连接(在 24050 毫秒后处于 UP 状态) [thrd:main]: Group "ImportFrameworksConsumer" 改变了状态 -> query-coord (join-state stable) [thrd:main]: Group "ImportFrameworksConsumer": 没有可用于协调器查询的代理:在状态查询坐标 sasl_plaintext://94.131.241.251:9094/2 中间隔:断开连接(在状态 UP 18793 毫秒后)Confluent.Kafka.Consumer2[System.String,System.String] sasl_plaintext://94.131.241.251:9093/ 1:断开连接(在状态 UP 18781 毫秒后)Confluent.Kafka.Consumer2[System.String,System.String] sasl_plaintext://94.131.241.251:9095/3:断开连接(在状态 UP 24894 毫秒后)Confluent.Kafka.Consumer2[ System.String,System.String] GroupCoordinator: 94.131.241.251:9093: 断开连接(在 24050 毫秒后处于 UP 状态) Confluent.Kafka.Consumer2[System.String,System.String] 4/4 代理关闭 Confluent.Kafka.Consumer2[ System.String,System.String] [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询-coord [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有代理可用于协调器查询:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有代理可用于协调器查询:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理已关闭: 重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [ thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd: main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态间隔查询-coord [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1] :代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询[thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [thrd:main]:组“ ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在state query-coord [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理已关闭:重新查询 [thrd:main ]:主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1 ]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有代理可用于协调器查询:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd :main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [thrd:main]:组“ImportFrameworksConsumer”:没有可用的代理对于协调器查询:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器的代理查询:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [ 1]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询抛出异常:System.dll 中的“System.Net.Sockets.SocketException”抛出异常: System.dll 中的“System.ObjectDisposedException” 引发的异常:System.dll 中的“System.Net.WebException” 引发的异常:System.dll 中的“System.Net.WebException” 引发的异常:System.dll 中的“System.ObjectDisposedException”异常抛出:System.dll 中的“System.Net.WebException”异常抛出:Elasticsearch.Net 中的“Elasticsearch.Net.ElasticsearchClientException”。dll [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔[ thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd: main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [ 2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [thrd:main] :组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标[thrd:main]中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标[thrd:main]中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标 [thrd:main] 中间隔:主题 vad- ImportFrameworksConsumerRetry [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [ thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [thrd:main]:组“ImportFrameworksConsumer “:没有可用于协调器查询的代理:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态间隔查询-坐标 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [ thrd:main]:消费者组会话在 10184 毫秒后超时(加入状态稳定),没有来自组协调器的成功响应(代理 1,最后一个错误是成功):撤销分配并重新加入组 [thrd:sasl_plaintext:// 94.131.241.251:9094/bootstrap]:sasl_plaintext://94.131.241.251:9094/2:连接到 ipv4#94.131.241.251:9094 失败:未知错误(在 CONNECT 状态下 21036 毫秒后)sasl_plaintext://94.131.241.251:9 /2:连接到 ipv4#94.131.241.251:9094 失败:未知错误(在 CONNECT 状态下 21036 毫秒后)Confluent.Kafka.Consumer2 [System.String,System.String] [thrd:sasl_plaintext://94.131.241.251:9095/bootstrap]: sasl_plaintext://94.131.241.251:9095/3:连接到 ipv4#94.131.241.251:9095 失败:未知错误(在状态 CONNECT 21040 毫秒后)[thrd:sasl_plaintext://94.131.241.251:9093/bootstrap]:sasl_plaintext: //94.131.241.251:9093/1:连接到 ipv4#94.131.241.251:9093 失败:未知错误(在 CONNECT 状态下 21041 毫秒后) sasl_plaintext://94.131.241.251:9095/3:连接到 ipv4#94.131.241.251: 9095 failed: Unknown error (after 21040ms in state CONNECT) Confluent.Kafka.Consumer2[System.String,System.String] sasl_plaintext://94.131.241.251:9093/1: Connect to ipv4#94.131.241.251:9093 failed: Unknown错误(在 CONNECT 状态下 21041 毫秒后)Confluent.Kafka。Consumer2 [System.String,System.String] [thrd:GroupCoordinator]:GroupCoordinator:94.131.241.251:9093:连接到 ipv4#94.131.241.251:9093 失败:未知错误(在 CONNECT 状态 21054 毫秒后)GroupCoordinator:94.131.241.251: 9093:连接到 ipv4#94.131.241.251:9093 失败:未知错误(在 CONNECT 状态 21054 毫秒后)Confluent.Kafka.Consumer`2[System.String,System.String]

Wifi 开启:

[thrd:sasl_plaintext://94.131.241.251:9095/bootstrap]: sasl_plaintext://94.131.241.251:9095/3:在偏移量 0 (v4) 处获取主题 vad-ImportFrameworksConsumerRetry [1] [thrd:sasl_plaintext://94.131 .241.251:9095/bootstrap]:sasl_plaintext://94.131.241.251:9095/3:在偏移量 18 (v4) [thrd:sasl_plaintext://94.131.241.251:9095/bootstrap] 处获取主题 vad-UpdateFrameworkFromImportCommand [2]: sasl_plaintext://94.131.241.251:9095/3:获取 2/2/2 toppar(s) [thrd:sasl_plaintext://94.131.241.251:9093/bootstrap]:sasl_plaintext://94.131.241.251:9093/1:在偏移量 9 (v4) [thrd:sasl_plaintext://94.131.241.251:9093/bootstrap] 处获取主题 vad-UpdateFrameworkFromImportCommand [0]:sasl_plaintext://94.131.241.251:9093/1:获取主题 vad-ImportFrameworksConsumerRetry [2]在偏移量 0 (v4) [thrd:sasl_plaintext://94.131.241.251:9093/bootstrap]: sasl_plaintext://94.131.241.251:9093/1:获取 2/2/2 个顶杆

标签: c#.netconfluent-kafka-dotnet

解决方案


推荐阅读