首页 > 解决方案 > Kafka Producer 导致 org.apache.kafka.common.network.InvalidReceiveException: Invalid receive

问题描述

我在 Kubernetes 上运行 3-zookeeper-cluster 和 3-kafka-cluster。
卡夫卡似乎正在运行。
但是,如果我向某个主题生成一些消息并检查该主题,则根本没有消息。

这是我的经纪人说的。那就是说一些无效的接收或其他东西,有趣的是试图使主题运作良好但产生。
我也可以观看我早期在 Topics-ui 上制作的主题或模式,Topics-ui 是代理的 GUI 工具。
Schema-registry、Connect、Rest 的日志很好,因此代理似乎运行良好。

org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1195725856 larger than 104857600)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:104)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
    at kafka.network.Processor.poll(SocketServer.scala:863)
    at kafka.network.Processor.run(SocketServer.scala:762)
    at java.lang.Thread.run(Thread.java:748)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:104)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
    at kafka.network.Processor.poll(SocketServer.scala:863)
    at kafka.network.Processor.run(SocketServer.scala:762)
    at java.lang.Thread.run(Thread.java:748)

这是我使用terraform Statefulset的代理配置

          port {
            container_port = 9092
          }   

          env {
            name = "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR" 
            value = "3" 
          }   

          env {
            name = "KAFKA_DEFAULT_REPLICATION_FACTOR" 
            value = "3" 
          }   

          env {
            name = "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" 
            value = "PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT"
          }   

          env {
            name = "KAFKA_ZOOKEEPER_CONNECT"
            value = "lucent-zookeeper-0.zookeeper-service.default:2181,lucent-zookeeper-1.zookeeper-service.default:2181,lucent-zookeeper-2.zookeeper-service.default:2181"
          }   

          env {
            name = "POD_IP"

            value_from {
              field_ref {
                field_path = "status.podIP"
              }   
            }   
          }   

          env {
            name = "HOST_IP"
            value_from {
              field_ref {
                field_path = "status.hostIP"
              }   
            }   
          }   

          env {
            name = "POD_NAME"

            value_from {
              field_ref {
                field_path = "metadata.name"
              }   
            }   
          }   

          env {
            name = "POD_NAMESPACE"

            value_from {
              field_ref {
                field_path = "metadata.namespace"
              }   
            }   
          }   

          command = [ 
            "sh",
            "-exec",
            "export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://$${POD_NAME}.kafka-service.$${POD_NAMESPACE}:9092 && export KAFKA_BROKER_ID=$${HOSTNAME##*-} && exec /etc/confluent/docker/run"
          ]   

服务

resource "kubernetes_service" "kafka-service" {
  metadata {
    name = "kafka-service"

    labels = {
      app = "broker" 
    }
  }

  spec {
    selector = {
      app = "broker"
    }

    port {
      port = 9092
    }

    cluster_ip = "None"
  }

尝试生成的代码

kafka-console-producer --broker-list kafka-service:9092 --topic test

标签: kubernetesapache-kafkaterraformkafka-producer-apikafka-topic

解决方案


我最初的猜测是您可能正试图接收一个太大的请求。最大大小是 的默认大小socket.request.max.bytes,即 100MB。因此,如果您有大于 100MB 的消息,请尝试在server.properties.


推荐阅读