首页 > 解决方案 > 如何在 Apache kafka 中启用 SFTP 源连接器?

问题描述

我正在尝试在 Apache kafka 中启用 SFTP csv 源连接器,然后在 confluent-platform 中使用它,所以我试图在 Apache 变体中执行相同的操作。

但我无法达到同样的效果

我为启用 SFTP csv 源连接器所做的步骤。

1)从以下链接下载了 zip 文件中的 kafka-connect-sftp。

https://www.confluent.io/hub/confluentinc/kafka-connect-sftp

2)解压压缩文件夹并将kafka-connect-sftp的libs文件夹复制到名为sftp的新文件夹下的Apache kafka libs文件夹

3) 更新了 connect-distributor.properties 插件路径,如下所示

plugin.path=/k_con_test/libs,/k_con_test/libs/sftp

4)当我尝试重新启动连接服务时,我在连接控制台中遇到了很多错误

[2021-04-09 14:51:26,305] WARN could not get type for name io.confluent.connect.storage.format.RecordWriterProvider from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name io.confluent.connect.storage.format.RecordWriterProvider
        at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:312)
        at org.reflections.Reflections.expandSuperTypes(Reflections.java:382)
        at org.reflections.Reflections.<init>(Reflections.java:140)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.<init>(DelegatingClassLoader.java:444)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:334)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:260)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:229)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:206)
        at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:61)
        at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
        at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
Caused by: java.lang.ClassNotFoundException: io.confluent.connect.storage.format.RecordWriterProvider
        at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
        at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
        at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:310)
        ... 11 more
[2021-04-09 14:51:26,306] WARN could not get type for name io.confluent.connect.storage.format.RecordWriter from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name io.confluent.connect.storage.format.RecordWriter
        at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:312)
        at org.reflections.Reflections.expandSuperTypes(Reflections.java:382)
        at org.reflections.Reflections.<init>(Reflections.java:140)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.<init>(DelegatingClassLoader.java:444)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:334)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:260)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:229)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:206)
        at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:61)
        at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
        at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
Caused by: java.lang.ClassNotFoundException: io.confluent.connect.storage.format.RecordWriter
        at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
        at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
        at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:310)
        ... 11 more
[2021-04-09 14:51:26,307] WARN could not get type for name io.confluent.connect.storage.StorageSinkConnectorConfig from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name io.confluent.connect.storage.StorageSinkConnectorConfig
        at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:312)
        at org.reflections.Reflections.expandSuperTypes(Reflections.java:382)
        at org.reflections.Reflections.<init>(Reflections.java:140)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.<init>(DelegatingClassLoader.java:444)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:334)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:260)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:229)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:206)
        at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:61)
        at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
        at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
Caused by: java.lang.ClassNotFoundException: io.confluent.connect.storage.StorageSinkConnectorConfig
        at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
        at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
        at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:310)
        ... 11 more
[2021-04-09 14:51:26,310] WARN could not get type for name org.apache.parquet.io.OutputFile from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.apache.parquet.io.OutputFile
        at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:312)
        at org.reflections.Reflections.expandSuperTypes(Reflections.java:382)
        at org.reflections.Reflections.<init>(Reflections.java:140)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.<init>(DelegatingClassLoader.java:444)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:334)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:260)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:229)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:206)
        at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:61)
        at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
        at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
Caused by: java.lang.ClassNotFoundException: org.apache.parquet.io.OutputFile
        at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
        at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
        at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:310)
        ... 11 more
[2021-04-09 14:51:26,311] WARN could not get type for name org.apache.parquet.io.PositionOutputStream from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.apache.parquet.io.PositionOutputStream
        at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:312)
        at org.reflections.Reflections.expandSuperTypes(Reflections.java:382)
        at org.reflections.Reflections.<init>(Reflections.java:140)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.<init>(DelegatingClassLoader.java:444)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:334)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:260)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:229)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:206)
        at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:61)
        at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
        at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
Caused by: java.lang.ClassNotFoundException: org.apache.parquet.io.PositionOutputStream
        at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
        at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
        at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:310)
        ... 11 more
[2021-04-09 14:51:26,313] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed)
java.lang.NoClassDefFoundError: io/confluent/connect/storage/StorageSinkConnectorConfig
        at java.base/java.lang.ClassLoader.defineClass1(Native Method)
        at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1016)
        at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
        at java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:550)
        at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458)
        at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452)
        at java.base/java.security.AccessController.doPrivileged(Native Method)
        at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451)
        at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:96)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
        at java.base/java.lang.Class.getDeclaredConstructors0(Native Method)
        at java.base/java.lang.Class.privateGetDeclaredConstructors(Class.java:3137)
        at java.base/java.lang.Class.getConstructor0(Class.java:3342)
        at java.base/java.lang.Class.newInstance(Class.java:556)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.versionFor(DelegatingClassLoader.java:395)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:365)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:337)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:260)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:229)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:206)
        at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:61)
        at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
        at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
Caused by: java.lang.ClassNotFoundException: io.confluent.connect.storage.StorageSinkConnectorConfig
        at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
        at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
        ... 24 more

无法粘贴整个日志,因为它的行数非常多

我可以知道如何在 Apache Kafka 中设置这些连接器吗

因为我不认为 apache 没有特定的 sftp 连接器,所以可以在这里使用 confluent 连接器吗?

标签: apache-kafkaapache-kafka-connect

解决方案


推荐阅读