apache-kafka - 如何在 Apache kafka 中启用 SFTP 源连接器?
问题描述
我正在尝试在 Apache kafka 中启用 SFTP csv 源连接器,然后在 confluent-platform 中使用它,所以我试图在 Apache 变体中执行相同的操作。
但我无法达到同样的效果
我为启用 SFTP csv 源连接器所做的步骤。
1)从以下链接下载了 zip 文件中的 kafka-connect-sftp。
https://www.confluent.io/hub/confluentinc/kafka-connect-sftp
2)解压压缩文件夹并将kafka-connect-sftp的libs文件夹复制到名为sftp的新文件夹下的Apache kafka libs文件夹
3) 更新了 connect-distributor.properties 插件路径,如下所示
plugin.path=/k_con_test/libs,/k_con_test/libs/sftp
4)当我尝试重新启动连接服务时,我在连接控制台中遇到了很多错误
[2021-04-09 14:51:26,305] WARN could not get type for name io.confluent.connect.storage.format.RecordWriterProvider from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name io.confluent.connect.storage.format.RecordWriterProvider
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:312)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:382)
at org.reflections.Reflections.<init>(Reflections.java:140)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.<init>(DelegatingClassLoader.java:444)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:334)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:260)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:229)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:206)
at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:61)
at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
Caused by: java.lang.ClassNotFoundException: io.confluent.connect.storage.format.RecordWriterProvider
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:310)
... 11 more
[2021-04-09 14:51:26,306] WARN could not get type for name io.confluent.connect.storage.format.RecordWriter from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name io.confluent.connect.storage.format.RecordWriter
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:312)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:382)
at org.reflections.Reflections.<init>(Reflections.java:140)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.<init>(DelegatingClassLoader.java:444)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:334)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:260)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:229)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:206)
at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:61)
at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
Caused by: java.lang.ClassNotFoundException: io.confluent.connect.storage.format.RecordWriter
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:310)
... 11 more
[2021-04-09 14:51:26,307] WARN could not get type for name io.confluent.connect.storage.StorageSinkConnectorConfig from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name io.confluent.connect.storage.StorageSinkConnectorConfig
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:312)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:382)
at org.reflections.Reflections.<init>(Reflections.java:140)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.<init>(DelegatingClassLoader.java:444)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:334)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:260)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:229)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:206)
at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:61)
at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
Caused by: java.lang.ClassNotFoundException: io.confluent.connect.storage.StorageSinkConnectorConfig
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:310)
... 11 more
[2021-04-09 14:51:26,310] WARN could not get type for name org.apache.parquet.io.OutputFile from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.apache.parquet.io.OutputFile
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:312)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:382)
at org.reflections.Reflections.<init>(Reflections.java:140)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.<init>(DelegatingClassLoader.java:444)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:334)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:260)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:229)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:206)
at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:61)
at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
Caused by: java.lang.ClassNotFoundException: org.apache.parquet.io.OutputFile
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:310)
... 11 more
[2021-04-09 14:51:26,311] WARN could not get type for name org.apache.parquet.io.PositionOutputStream from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.apache.parquet.io.PositionOutputStream
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:312)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:382)
at org.reflections.Reflections.<init>(Reflections.java:140)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.<init>(DelegatingClassLoader.java:444)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:334)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:260)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:229)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:206)
at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:61)
at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
Caused by: java.lang.ClassNotFoundException: org.apache.parquet.io.PositionOutputStream
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:310)
... 11 more
[2021-04-09 14:51:26,313] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed)
java.lang.NoClassDefFoundError: io/confluent/connect/storage/StorageSinkConnectorConfig
at java.base/java.lang.ClassLoader.defineClass1(Native Method)
at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1016)
at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
at java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:550)
at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458)
at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:96)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
at java.base/java.lang.Class.getDeclaredConstructors0(Native Method)
at java.base/java.lang.Class.privateGetDeclaredConstructors(Class.java:3137)
at java.base/java.lang.Class.getConstructor0(Class.java:3342)
at java.base/java.lang.Class.newInstance(Class.java:556)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.versionFor(DelegatingClassLoader.java:395)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:365)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:337)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:260)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:229)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:206)
at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:61)
at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
Caused by: java.lang.ClassNotFoundException: io.confluent.connect.storage.StorageSinkConnectorConfig
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
... 24 more
无法粘贴整个日志,因为它的行数非常多
我可以知道如何在 Apache Kafka 中设置这些连接器吗
因为我不认为 apache 没有特定的 sftp 连接器,所以可以在这里使用 confluent 连接器吗?
解决方案
推荐阅读
- html - 如何将导航栏品牌和导航栏与 bootstrap 5 对齐?
- html - 绝对位置使背景图像消失,我不知道为什么
- vuejs2 - 如何在nuxt,vue的fetch钩子中调用多个函数
- ansible - 无法在 Ansible shell 模块中设置超时
- python - Dask延迟总和被杀死但有足够的资源
- arrays - 如何使用 React.js 中的 i18next 翻译与 .map 映射 .json 文件?
- python - 卡在机器上进行渗透测试学习,可能是 sudo 漏洞?
- algorithm - FIFO 何时胜过 LRU 替换算法?
- c++ - 关于 C++ 名称查找的困惑
- c# - Pascal 代码到 C# 代码的转换(ord 和 chr)