java - kafka 连接到 Google BigQuery 抛出错误 java.lang.NoClassDefFoundError: org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
问题描述
我正在尝试从 Kafka 主题流式传输到 Google BigQuery。我的connect-standalone.properties文件如下:
bootstrap.servers=rhes75:9092,rhes75:9093,rhes75:9094,rhes564:9092,rhes564:9093,rhes564:9094,rhes76:9092,rhes76:9093,rhes76:9094
key.converter=org.apache.kafka.connect.storage.StringConverter
##value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect_bq.offsets
offset.flush.interval.ms=10000
#plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0
我的bigquery-sink.properties如下
name=bigquery-sink
connector.type=bigquery-connector
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
defaultDataset=test
project=axial-glow-224522
topics=md
autoCreateTables=false
gcsBucketName=tmp_storage_bucket
queueSize=-1
bigQueryRetry=0
bigQueryRetryWait=1000
bigQueryMessageTimePartitioning=false
bigQueryPartitionDecorator=true
timePartitioningType=DAY
keySource=FILE
keyfile=/home/hduser/xyz.json
sanitizeTopics=false
schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
threadPoolSize=10
allBQFieldsNullable=false
avroDataCacheSize=100
batchLoadIntervalSec=120
convertDoubleSpecialValues=false
enableBatchLoad=false
upsertEnabled=false
deleteEnabled=false
mergeIntervalMs=60_000L
mergeRecordsThreshold=-1
autoCreateBucket=true
allowNewBigQueryFields=false
allowBigQueryRequiredFieldRelaxation=false
allowSchemaUnionization=false
kafkaDataFieldName=null
kafkaKeyFieldName=null
我运行代码如下
#!/bin/ksh
unset CLASSPATH
export CLASSPATH=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib:$KAFKA_HOME/libs
export KAFKA_DEBUG=y
$KAFKA_HOME/bin/connect-standalone.sh \
/d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
/d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
它返回此错误
[2021-03-13 18:17:38,499] INFO Started o.e.j.s.ServletContextHandler@2ecf5915{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2021-03-13 18:17:38,503] INFO Started http_8083@6badba10{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
[2021-03-13 18:17:38,503] INFO Started @1981ms (org.eclipse.jetty.server.Server:379)
[2021-03-13 18:17:38,503] INFO Advertised URI: http://50.140.197.220:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:248)
[2021-03-13 18:17:38,503] INFO REST server listening at http://50.140.197.220:8083/, advertising URL http://50.140.197.220:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:207)
[2021-03-13 18:17:38,503] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:55)
[2021-03-13 18:17:38,518] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:113)
java.lang.NoClassDefFoundError: org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
at com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
at com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
at org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
它抱怨找不到这个类
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
但是,在目录下
/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib
它在 kafka-clients-1.1.0.jar
jar tvf kafka-clients-1.1.0.jar|grep org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
2051 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$1.class
1175 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$2.class
2876 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$3.class
1132 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$4.class
1801 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$CompositeValidator.class
3274 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$ConfigKey.class
1255 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$Importance.class
1104 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$NonEmptyString.class
2144 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$NonEmptyStringWithoutControlChars.class
922 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$NonNullValidator.class
2072 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$Range.class
583 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$Recommender.class
1513 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$Type.class
293 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$Validator.class
1943 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$ValidList.class
2196 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$ValidString.class
1269 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef$Width.class
34225 Fri Mar 23 22:54:26 GMT 2018 org/apache/kafka/common/config/ConfigDef.class
所以我不知道这里缺少什么?
加载的罐子如下所示
jvm.classpath = /d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib:/data6/hduser/kafka_2.12-1.1.0/libs:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/aopalliance-repackaged-2.5.0-b32.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/argparse4j-0.7.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/commons-lang3-3.5.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/connect-api-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/connect-file-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/connect-json-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/connect-runtime-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/connect-transforms-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/guava-20.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/hk2-api-2.5.0-b32.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/hk2-locator-2.5.0-b32.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/hk2-utils-2.5.0-b32.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jackson-annotations-2.9.4.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jackson-core-2.9.4.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jackson-databind-2.9.4.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jackson-jaxrs-base-2.9.4.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jackson-jaxrs-json-provider-2.9.4.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jackson-module-jaxb-annotations-2.9.4.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/javassist-3.20.0-GA.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/javassist-3.21.0-GA.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/javax.annotation-api-1.2.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/javax.inject-1.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/javax.inject-2.5.0-b32.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/javax.servlet-api-3.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/javax.ws.rs-api-2.0.1.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jersey-client-2.25.1.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jersey-common-2.25.1.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jersey-container-servlet-2.25.1.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jersey-container-servlet-core-2.25.1.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jersey-guava-2.25.1.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jersey-media-jaxb-2.25.1.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jersey-server-2.25.1.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jetty-client-9.2.24.v20180105.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jetty-continuation-9.2.24.v20180105.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jetty-http-9.2.24.v20180105.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jetty-io-9.2.24.v20180105.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jetty-security-9.2.24.v20180105.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jetty-server-9.2.24.v20180105.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jetty-servlet-9.2.24.v20180105.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jetty-servlets-9.2.24.v20180105.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jetty-util-9.2.24.v20180105.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/jopt-simple-5.0.4.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/kafka-clients-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/kafka-log4j-appender-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/kafka-streams-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/kafka-streams-examples-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/kafka-streams-test-utils-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/kafka-tools-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/kafka_2.12-1.1.0-sources.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/kafka_2.12-1.1.0-test-sources.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/kafka_2.12-1.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/log4j-1.2.17.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/lz4-java-1.4.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/maven-artifact-3.5.2.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/metrics-core-2.2.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/osgi-resource-locator-1.0.1.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/plexus-utils-3.1.0.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/reflections-0.9.11.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/rocksdbjni-5.7.3.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/scala-library-2.12.4.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/scala-logging_2.12-3.7.2.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/scala-reflect-2.12.4.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/slf4j-api-1.7.25.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/slf4j-log4j12-1.7.25.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/snappy-java-1.1.7.1.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/validation-api-1.1.0.Final.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/zkclient-0.10.jar:/data6/hduser/kafka_2.12-1.1.0/bin/../libs/zookeeper-3.4.10.jar
os.spec = Linux, amd64, 3.10.0-1062.9.1.el7.x86_64
os.vcpus = 6
解决方案
谢谢大家。
我使用的是较旧的 Kafka 版本。
我将集群中的Kafka从kafka_2.12-1.1.0升级到最新的稳定版本kafka_2.12-2.7.0。我还将 zookeeper 从 zookeeper-3.4.6 升级到 apache-zookeeper-3.6.2-bin 版本。
此外,在运行文件中,我添加了以下内容:
#!/bin/ksh
unset CLASSPATH
export CLASSPATH=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins:$KAFKA_HOME/libs
$KAFKA_HOME/bin/connect-standalone.sh \
/d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
/d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
CLASSPATH 的第一部分来自插件
plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
这解决了这个问题
推荐阅读
- 3d - Gnuplot 3d depthorder with multiple plots
- php - 在 Windows 上为程序安装一个 PHP 扩展
- django - 我在 Django 中做了一个评论模型。我将如何制作评论回复模型?
- spring - 在 SpringMVC 中,我想用 Http Session 创建 Hello World
- react-native - React 原生扫描库
- git - git hook 首次提交到新分支
- css - 带有 Sass 的 Eclipse CSS 调试不加载更改
- powershell - PowerShell module not exposing commands properly
- plot - how to use Graphviz to draw a node pointed by an arrow?
- wpf - Update Textbox after each iteration VB.Net WPF