首页 > 解决方案 > kafka 连接到 Google BigQuery 抛出错误 java.lang.NoClassDefFoundError: org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString

问题描述

我正在尝试从 Kafka 主题流式传输到 Google BigQuery。我的connect-standalone.properties文件如下:

bootstrap.servers=rhes75:9092,rhes75:9093,rhes75:9094,rhes564:9092,rhes564:9093,rhes564:9094,rhes76:9092,rhes76:9093,rhes76:9094
key.converter=org.apache.kafka.connect.storage.StringConverter
##value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect_bq.offsets
offset.flush.interval.ms=10000

#plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0

我的bigquery-sink.properties如下

name=bigquery-sink
connector.type=bigquery-connector
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
defaultDataset=test
project=axial-glow-224522
topics=md
autoCreateTables=false
gcsBucketName=tmp_storage_bucket
queueSize=-1
bigQueryRetry=0
bigQueryRetryWait=1000
bigQueryMessageTimePartitioning=false
bigQueryPartitionDecorator=true
timePartitioningType=DAY
keySource=FILE
keyfile=/home/hduser/xyz.json
sanitizeTopics=false
schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
threadPoolSize=10
allBQFieldsNullable=false
avroDataCacheSize=100
batchLoadIntervalSec=120
convertDoubleSpecialValues=false
enableBatchLoad=false
upsertEnabled=false
deleteEnabled=false
mergeIntervalMs=60_000L
mergeRecordsThreshold=-1
autoCreateBucket=true
allowNewBigQueryFields=false
allowBigQueryRequiredFieldRelaxation=false
allowSchemaUnionization=false
kafkaDataFieldName=null
kafkaKeyFieldName=null

我运行代码如下

#!/bin/ksh
unset CLASSPATH
export CLASSPATH=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib:$KAFKA_HOME/libs
export KAFKA_DEBUG=y
$KAFKA_HOME/bin/connect-standalone.sh \
/d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
/d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties

它返回此错误

[2021-03-13 18:17:38,499] INFO Started o.e.j.s.ServletContextHandler@2ecf5915{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2021-03-13 18:17:38,503] INFO Started http_8083@6badba10{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
[2021-03-13 18:17:38,503] INFO Started @1981ms (org.eclipse.jetty.server.Server:379)
[2021-03-13 18:17:38,503] INFO Advertised URI: http://50.140.197.220:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:248)
[2021-03-13 18:17:38,503] INFO REST server listening at http://50.140.197.220:8083/, advertising URL http://50.140.197.220:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:207)
[2021-03-13 18:17:38,503] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:55)
[2021-03-13 18:17:38,518] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:113)
java.lang.NoClassDefFoundError: org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
        at com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
        at com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
        at org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
        at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
        at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
        at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

它抱怨找不到这个类

Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString

但是,在目录下

/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib

它在 kafka-clients-1.1.0.jar

jar tvf kafka-clients-1.1.0.jar|grep org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
  2051 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$1.class
  1175 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$2.class
  2876 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$3.class
  1132 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$4.class
  1801 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$CompositeValidator.class
  3274 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$ConfigKey.class
  1255 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$Importance.class
  1104 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$NonEmptyString.class
  2144 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$NonEmptyStringWithoutControlChars.class
   922 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$NonNullValidator.class
  2072 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$Range.class
   583 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$Recommender.class
  1513 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$Type.class
   293 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$Validator.class
  1943 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$ValidList.class
  2196 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$ValidString.class
  1269 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$Width.class
 34225 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef.class

所以我不知道这里缺少什么?

加载的罐子如下所示

        jvm.classpath = /d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib:/data6/hduser/kafka_2.12-1.1.0/libs:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/aopalliance-repackaged-2.5.0-b32.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/argparse4j-0.7.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/commons-lang3-3.5.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/connect-api-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/connect-file-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/connect-json-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/connect-runtime-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/connect-transforms-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/guava-20.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/hk2-api-2.5.0-b32.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/hk2-locator-2.5.0-b32.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/hk2-utils-2.5.0-b32.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jackson-annotations-2.9.4.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jackson-core-2.9.4.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jackson-databind-2.9.4.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jackson-jaxrs-base-2.9.4.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jackson-jaxrs-json-provider-2.9.4.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jackson-module-jaxb-annotations-2.9.4.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/javassist-3.20.0-GA.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/javassist-3.21.0-GA.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/javax.annotation-api-1.2.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/javax.inject-1.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/javax.inject-2.5.0-b32.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/javax.servlet-api-3.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/javax.ws.rs-api-2.0.1.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jersey-client-2.25.1.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jersey-common-2.25.1.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jersey-container-servlet-2.25.1.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jersey-container-servlet-core-2.25.1.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jersey-guava-2.25.1.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jersey-media-jaxb-2.25.1.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jersey-server-2.25.1.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jetty-client-9.2.24.v20180105.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jetty-continuation-9.2.24.v20180105.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jetty-http-9.2.24.v20180105.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jetty-io-9.2.24.v20180105.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jetty-security-9.2.24.v20180105.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jetty-server-9.2.24.v20180105.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jetty-servlet-9.2.24.v20180105.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jetty-servlets-9.2.24.v20180105.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jetty-util-9.2.24.v20180105.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jopt-simple-5.0.4.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/kafka-clients-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/kafka-log4j-appender-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/kafka-streams-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/kafka-streams-examples-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/kafka-streams-test-utils-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/kafka-tools-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/kafka_2.12-1.1.0-sources.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/kafka_2.12-1.1.0-test-sources.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/kafka_2.12-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/log4j-1.2.17.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/lz4-java-1.4.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/maven-artifact-3.5.2.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/metrics-core-2.2.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/osgi-resource-locator-1.0.1.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/plexus-utils-3.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/reflections-0.9.11.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/rocksdbjni-5.7.3.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/scala-library-2.12.4.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/scala-logging_2.12-3.7.2.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/scala-reflect-2.12.4.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/slf4j-api-1.7.25.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/slf4j-log4j12-1.7.25.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/snappy-java-1.1.7.1.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/validation-api-1.1.0.Final.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/zkclient-0.10.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/zookeeper-3.4.10.jar
        os.spec = Linux, amd64, 3.10.0-1062.9.1.el7.x86_64
        os.vcpus = 6

标签: javaapache-kafkagoogle-bigquery

解决方案


谢谢大家。

我使用的是较旧的 Kafka 版本。

我将集群中的Kafka从kafka_2.12-1.1.0升级到最新的稳定版本kafka_2.12-2.7.0。我还将 zookeeper 从 zookeeper-3.4.6 升级到 apache-zookeeper-3.6.2-bin 版本。

此外,在运行文件中,我添加了以下内容:

#!/bin/ksh
unset CLASSPATH
export CLASSPATH=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins:$KAFKA_HOME/libs
$KAFKA_HOME/bin/connect-standalone.sh \
/d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
/d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties

CLASSPATH 的第一部分来自插件

plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins

这解决了这个问题


推荐阅读