首页 > 解决方案 > ksqldb - select * from stream' 结果导致 io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor 找不到

问题描述

我尝试了一下 ksqldb,并制作了一个像这样的 docker-compose.yml:

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.3.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka:5.3.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
      KAFKA_DELETE_TOPIC_ENABLE: 'true'
      KAFKA_MESSAGE_MAX_BYTES: 10000000
      KAFKA_MAX_PARTITION_FETCH_BYTES: 10000000

  schema-registry:
    image: confluentinc/cp-schema-registry:5.3.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  connect:
    image: confluentinc/cp-server-connect:5.3.1
    hostname: connect
    container_name: connect
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    volumes:
      - C:\WS\prototypes\POCONE\Werkstatt\connect-coppclark\target:/usr/share/java/coppclark
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.3.1.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
      COMPOSE_CONVERT_WINDOWS_PATHS: 1
      CONNECT_PRODUCER_MAX_REQUEST_SIZE: 10000000
      CONNECT_CONSUMER_MAX_PARTITION_FETCH_BYTES: 10000000


  # https://docs:ksqldb:io/en/latest/operate-and-deploy/installation/server-config/integrate-ksql-with-confluent-control-center/:
  control-center:
    image: confluentinc/cp-enterprise-control-center:5.3.1
    hostname: control-center
    container_name: control-center
    depends_on:
      - zookeeper
      - broker
      - schema-registry
      - connect
      - ksqldb-server
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_KSQL_URL: "http://ksqldb-server:8088"
      CONTROL_CENTER_KSQL_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

  rest-proxy:
    image: confluentinc/cp-kafka-rest:5.3.1
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

  ksqldb-server:
    image: confluentinc/ksqldb-server:0.6.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_APPLICATION_ID: "cp-all-in-one"
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:29092
      KSQL_HOST_NAME: ksqldb-server
      KSQL_KSQL_CONNECT_URL: http://connect:8083
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.6.0
    container_name: ksqldb-cli
    depends_on:
      - broker
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  # coppclark- static resources
  coppclark-static-http:
    image: springcloudstream/coppclarkstatic:0.0.1-SNAPSHOT
    container_name: coppclark-static-http
    ports:
      - '8080:8080'
    expose:
      - '8080'

然后从 ksqldb 中创建一个新的(自定义)连接器

CREATE SOURCE CONNECTOR holidayEvent WITH (
 'connector.class' = 'CoppclarkSourceConnector',
 'topic' = 'holidayEvent',
 'url' = 'http://coppclark-static-http:8080/coppclark.csv',
 'csv.parser' = true
);

这导致这样的记录(在主题holidayEvent中):

{
  "CENTER_ID": "150",
  "ISO_CURRENCY_CODE": "SRD",
  "ISO_COUNTRY_CODE": "SR",
  "RELATED_FINANCIAL_CENTER": "Paramaribo",
  "EVENT_YEAR": "2067",
  "EVENT_DATE": "20671225",
  "EVENT_DAY_OF_WEEK": "Sun",
  "EVENT_nAME": "Christmas Day",
  "FILE_TYPE": "C"
}  

已创建流

CREATE STREAM holidayEventStream WITH(kafka_topic='holidayEvent', value_format='AVRO');

到目前为止一切顺利,但是当我想从这个创建的 Stream 中进行选择时,我得到了以下错误:

ksql> select * from  HOLIDAYEVENTSTREAM EMIT CHANGES;
Failed to construct kafka consumer
Caused by: Class
        io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor cannot
        be found
Caused by:
        io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor

我确定我错过了一些东西,但不知道下一步该做什么。

问候雷内

标签: ksqldb

解决方案


ksqldb-server从您的服务中删除这两行:

KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"   

ksqlDB 0.6 不附带拦截器。


推荐阅读