java - java包中sink连接器的实现
问题描述
我已经启动了zookeeper,kafka服务器,kafka生产者和kafka消费者,我已经把从confluent下载的jdbc sql连接器jar放到了路径中,我在connect-standalone properties中提到了plugin.path。我已经运行了connect- Standalone.bat ....\config\connect-standalone.properties ....\config\sink-quickstart-mysql.properties 没有任何错误,但它有很多警告并且它没有开始,但我的数据没有得到反映在表格中。我错过了什么?你能帮我吗?我有以下警告
org.reflections.ReflectionsException: could not get type for name io.netty.inter
nal.tcnative.SSLPrivateKeyMethod
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:312)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:382)
at org.reflections.Reflections.<init>(Reflections.java:140)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$Inte
rnalReflections.<init>(DelegatingClassLoader.java:433)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scan
PluginPath(DelegatingClassLoader.java:325)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scan
UrlsAndAddPlugins(DelegatingClassLoader.java:261)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.init
PluginLoader(DelegatingClassLoader.java:209)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.init
Loaders(DelegatingClassLoader.java:202)
at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.jav
a:60)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone
.java:79)
Caused by: java.lang.ClassNotFoundException: io.netty.internal.tcnative.SSLPriva
teKeyMethod
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:310)
... 9 more
解决方案
除非您需要将 kafka 连接到一些奇特的数据源,否则无需自己编写源连接器。像mysql这样的流行工具已经很好地覆盖了。confluent 已经有一个“jdbc-connector”可以满足您的需求。
https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html
您需要一个有效的 kafka-connect 安装,然后您可以通过 HTTP POST 到 kafka 连接 API 将您的 mysql 表“连接”到 kafka。tables.whitelist
只需在属性中指定要用作源的表的逗号分隔列表。例如,像这样的事情......
curl -X POST $KAFKA_CONNECT_API/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_mysql_01",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://mysql:3306/test",
"connection.user": "connect_user",
"connection.password": "connect_password",
"topic.prefix": "mysql-01-",
"poll.interval.ms" : 3600000,
"table.whitelist" : "test.accounts",
"mode":"bulk"
}
}'