首页 > 解决方案 > Broken Pipe 错误后 Kafka 不会重新加入集群

问题描述

我有一个带有 3 个代理和 3 个动物园管理员的 Kafka 集群。根据 Kafka server.log,它由于某种未知原因间歇性地遇到管道中断错误。在损坏的管道错误消失后,Kafka 代理决定离开集群并成为自己的领导者,而不是重新加入集群(因为它将 ISR 从 3 缩小到 1)。

到目前为止,唯一的解决方法是重新启动代理,它将作为跟随者正常重新加入集群。但是我们不能每次出现类似问题时都手动重启。

[2019-05-10 10:32:48,344] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
    at sun.nio.ch.IOUtil.write(IOUtil.java:65)
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
    at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:209)
    at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:172)
    at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718)
    at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:61)
    at org.apache.kafka.common.network.Selector.doClose(Selector.java:746)
    at org.apache.kafka.common.network.Selector.close(Selector.java:734)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:532)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:424)
    at kafka.network.Processor.poll(SocketServer.scala:628)
    at kafka.network.Processor.run(SocketServer.scala:545)
    at java.lang.Thread.run(Thread.java:745)
[2019-05-10 10:32:48,368] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
    at sun.nio.ch.IOUtil.write(IOUtil.java:65)
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
    at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:209)
    at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:159)
    at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718)
    at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:61)
    at org.apache.kafka.common.network.Selector.doClose(Selector.java:746)
    at org.apache.kafka.common.network.Selector.close(Selector.java:734)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:532)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:424)
    at kafka.network.Processor.poll(SocketServer.scala:628)
    at kafka.network.Processor.run(SocketServer.scala:545)
    at java.lang.Thread.run(Thread.java:745)
[2019-05-10 10:32:53,422] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
    at sun.nio.ch.IOUtil.write(IOUtil.java:65)
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
    at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:209)
    at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:159)
    at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718)
    at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:61)
    at org.apache.kafka.common.network.Selector.doClose(Selector.java:746)
    at org.apache.kafka.common.network.Selector.close(Selector.java:734)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:532)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:424)
    at kafka.network.Processor.poll(SocketServer.scala:628)
    at kafka.network.Processor.run(SocketServer.scala:545)
    at java.lang.Thread.run(Thread.java:745)
[2019-05-10 10:32:56,976] INFO [Partition CS_NL_CUSTOMER_ADD-1 broker=4] Shrinking ISR from 4,6 to 4 (kafka.cluster.Partition)
[2019-05-10 10:32:56,994] INFO [Partition CS_NL_CUSTOMER_ADD-1 broker=4] Cached zkVersion [24394] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2019-05-10 10:32:56,994] INFO [Partition _confluent-controlcenter-5-1-0-1-MetricsAggregateStore-changelog-3 broker=4] Shrinking ISR from 4,6 to 4 (kafka.cluster.Partition)
[2019-05-10 10:32:57,023] INFO [Partition _confluent-controlcenter-5-1-0-1-MetricsAggregateStore-changelog-3 broker=4] Cached zkVersion [3724] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2019-05-10 10:32:57,023] INFO [Partition TEST_3_PART-2 broker=4] Shrinking ISR from 4,6 to 4 (kafka.cluster.Partition)
[2019-05-10 10:32:57,033] INFO [Partition TEST_3_PART-2 broker=4] Cached zkVersion [3300] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)

据我了解卡夫卡,卡夫卡经纪人不是应该尽可能重新加入集群吗?为什么在出现断管错误后没有发生?还有一件事是,知道是什么导致了 Broken Pipe 错误吗?是网络问题吗?

标签: apache-kafka

解决方案


证书似乎有问题。您需要为代理颁发签名证书并将其添加到客户端的密钥库中。

Confluent 关于如何使用 SSL 配置加密和身份验证的指南应该让事情变得更加清晰。


推荐阅读