java - 无法启动使用 Windows bat 文件分发的 kafka 连接
问题描述
更新:
我查看了提到的 kafka.jar - 它似乎是由运行分布式工作程序的 CMD 文件添加到类路径中的 - 特别是 kafka-run-class.bat 并且它小于 1kb 所以似乎没有成为一个有效的 JAR - 所以也许第一个错误是一个红鲱鱼。但是,如果它继续运行,分布式工作人员只会吐出一堆关于 ClassNotFoundException 的错误,然后继续说我的分布式属性配置文件中提供的每个配置值都无法识别,并且不会尝试加载任何插件
原始问题
我只是在尝试一个非常简单的测试来启动一个 Kafka Connect 分布式工作程序,还没有插件 - 只是想让基本工作程序运行起来。
我的分布式配置非常丰富
bootstrap.servers=GBV04303950:9092,GBV04303950:9082
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=2
config.storage.topic=connect-configs
config.storage.replication.factor=2
status.storage.topic=connect-status
status.storage.replication.factor=2
但是我收到以下错误
C:\kafka\ClusterOne\connect-worker_1>.\bin\windows\connect-distributed.bat .\etc\kafka-connect-replicator\replicator-connect-distributed.properties
[2019-03-22 10:53:51,832] WARN could not create Dir using jarFile from url file:/C:/kafka/ClusterOne/connect-worker_1/share/java/kafka/kafka.jar. skip
ping. (org.reflections.Reflections)
java.util.zip.ZipException: error in opening zip file
at java.util.zip.ZipFile.open(Native Method)
at java.util.zip.ZipFile.<init>(Unknown Source)
at java.util.zip.ZipFile.<init>(Unknown Source)
at java.util.jar.JarFile.<init>(Unknown Source)
at java.util.jar.JarFile.<init>(Unknown Source)
at org.reflections.vfs.Vfs$DefaultUrlTypes$1.createDir(Vfs.java:216)
at org.reflections.vfs.Vfs.fromURL(Vfs.java:99)
at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
at org.reflections.Reflections.scan(Reflections.java:240)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.scan(DelegatingClassLoader.java:412)
at org.reflections.Reflections$1.run(Reflections.java:198)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
我能想到的唯一原因是我的本地驱动器被锁定,除非您具有提升的权限,否则不允许您创建文件夹。我从一个以提升权限开始的 CMD 提示符运行它,但正如我所注意到的,这并不总是转化为将这些权限传递给从 CMD 提示符中产生的进程。但我不确定如何为尝试解压缩 JAR 的 java 进程提供提升的权限。
此外,我认为该错误之后是一长串 ClassNotFoundException,因为上述错误意味着无法解压缩 JAR 以加载所有必需的类
例如
[2019-03-22 10:56:04,567] WARN could not get type for name org.apache.kafka.common.utils.MockTime from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.apache.kafka.common.utils.MockTime
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.<init>(Reflections.java:126)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.<init>(DelegatingClassLoader.java:404)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:304)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:242)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:190)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:183)
at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:61)
at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:90)
at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:77)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.utils.MockTime
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 10 more
[2019-03-22 10:56:04,706] WARN could not get type for name com.google.gson.JsonDeserializer from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name com.google.gson.JsonDeserializer
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.<init>(Reflections.java:126)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.<init>(DelegatingClassLoader.java:404)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:304)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:242)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:190)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:183)
at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:61)
at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:90)
at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:77)
Caused by: java.lang.ClassNotFoundException: com.google.gson.JsonDeserializer
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 10 more
[2019-03-22 10:56:04,857] WARN could not get type for name org.scalatest.junit.JUnitSuite from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.scalatest.junit.JUnitSuite
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.<init>(Reflections.java:126)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.<init>(DelegatingClassLoader.java:404)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:304)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:242)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:190)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:183)
at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:61)
at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:90)
at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:77)
Caused by: java.lang.ClassNotFoundException: org.scalatest.junit.JUnitSuite
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 10 more
任何人都可以建议他们是否认为我关于提升权利的理论是为什么这不起作用或者我该如何解决这个问题?
解决方案
在 share/java/kafka 文件夹中有一个 kafka.jar 大约 26 字节。如果您使用文本编辑器查看它,您会发现它是一个链接。将链接的 jar 复制到 kafka.jar。
在 Windows 上,您还应该将 plugin.path 设置为绝对路径。启动时会出现一些错误,这是由于缺少 aws 和其他 jars。
推荐阅读
- mysql - 从模式创建表复合键
- java - Arquillian & Testcontainers:无效的 Oracle URL
- ios - 在 UIAlertController 中使用时无法设置 NSAttributedString 的前景色
- jspdf - 如何使用 AutoTable 创建不规则表格
- python - 获取命令行参数
- anaconda - 无法从 Anaconda Win10 打开 Spyder
- node.js - .npmrc 文件有什么用
- elasticsearch - 数学函数不适用于 ElasticSearch 脚本中的运算符
- ios13 - iOS 13:如何防止应用程序在后台被杀死?
- python - 如何在 Flask 应用程序中使用 Postgre 的 HSTORE 类型?