首页 > 解决方案 > 一个代理节点宕机后,Kafka Connect 失败

问题描述

我有一个 zookeeper,2 个集群代理和 2 个集群连接,集群连接工作正常并且完全相互覆盖。但是当一个代理节点出现故障时,连接器将打印未连接到代理并且连接器无法正常工作。我检查了连接器主题,问题发生后没有任何推送。

version: '2'

volumes:
  zoo-data:
  zoo-log:
  broker-data:
  broker-log:  
  broker2-data: 
  broker2-log:  
  connecotr-data:
  connecotr2-data:  
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
      - "2888:2888"
      - "3888:3888"      
    volumes:
      - zoo-data:/var/lib/zookeeper/data
      - zoo-log:/var/lib/zookeeper/log
    environment:
#      ZOOKEEPER_CLIENT_PORT: 22181
      ZOOKEEPER_TICK_TIME: 2000
      KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=*"
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2

#      ZOOKEEPER_SERVERS: zookeeper:2888:3888      

  broker:
    image: confluentinc/cp-server:6.2.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    volumes:
      - broker-data:/var/lib/kafka/data
      - broker-log:/var/lib/kafka/log
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://:9092 
      DELETE_TOPIC_ENABLE: 'true'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 2
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 2
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
      BOOTSTRAP_SERVERS: broker2:29094,broker:29092
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      
  broker2:
    image: confluentinc/cp-server:6.2.0
    hostname: broker2
    container_name: broker2
    depends_on:
      - zookeeper
    ports:
      - "9094:9094"
      - "9103:9103"
    volumes:
      - broker2-data:/var/lib/kafka/data
      - broker2-log:/var/lib/kafka/log
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181      
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker2:29094,PLAINTEXT_HOST://localhost:9094
      KAFKA_LISTENERS: PLAINTEXT://broker2:29094,PLAINTEXT_HOST://:9094         
      DELETE_TOPIC_ENABLE: 'true'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 2
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 2
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
      BOOTSTRAP_SERVERS: broker2:29094,broker:29092
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081

  #    BOOTSTRAP_SERVERS: broker2:29094,broker:29092
  
  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    container_name: kafdrop
    ports:
      - 9000:9000
    environment:
      - KAFKA_BROKERCONNECT=broker:29092,broker2:29094
    depends_on: 
      - broker

  schema-registry:
    image: confluentinc/cp-schema-registry:6.2.0
    hostname: schema-registry
    container_name: schema-registry
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092,broker2:29094'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  connect:
    image: cnfldemos/cp-server-connect-datagen:0.5.0-6.2.0
#    image: confluentinc/cp-kafka-connect:5.4.0
    hostname: connect
    container_name: connect
    depends_on:     
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092,broker2:29094'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-mysql
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect=DEBUG"
      # CLASSPATH required due to CC-2422
     # CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.2.0.jar
   #   CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
    #  CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: '/usr/share/java'
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR    
    volumes:
      - connecotr-data:/etc/docker/volumnes/kafka/connector/
            
  connect2:
    image: cnfldemos/cp-server-connect-datagen:0.5.0-6.2.0
#    image: confluentinc/cp-kafka-connect:5.4.0
    hostname: connect2
    container_name: connect2
    depends_on:     
      - schema-registry
    ports:
      - "18083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker2:29094,broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect2
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-mysql
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_LOG4J_LOGGERS: DEBUG
      # CLASSPATH required due to CC-2422
     # CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.2.0.jar
   #   CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
    #  CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: '/usr/share/java'
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR    
    volumes:
      - connecotr2-data:/etc/docker/volumnes/kafka/connector/

标签: apache-kafkadocker-composeconfluent-platform

解决方案


您的 Connect 主题复制因子只有一个。

  CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-storage  # renamed - it is not only for mysql
  CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
  CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
  CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
  CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
  CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1

如果托管这些主题的代理崩溃,那么您应该期望 Connect 也会崩溃/停止工作


推荐阅读