首页 > 解决方案 > Kafka Mysql CDC 到弹性搜索

问题描述

我试图通过将我的MySql数据库更改外包给 kafka 主题source-connector,这很有效。现在我想将这些数据发送到弹性搜索实例。

为此,我关注了这个Kafka Connect Elasticsearch:Consuming and Indexing with Kafka Connect和这个Kafka Connect and Elasticsearch

对于 mysql 到 kafka 的 CDC,我可以看到我在 mysql 中所做的更改并读取它创建一个源连接器,但是当我创建另一个连接elasticsearch-sink器连接器时,显示source-connectr task.stateFailed!因此,尽管在 es-config.properties 文件中创建索引作为设置,但数据库更改不会进入 ES。

我已将 jar/s 放在 kafka-dir 中,其中源连接器的 lib 工作(以避免有关类路径问题的进一步问题)。

创建时elaticsearch-sink-connector出现此错误(尽管当然,我没有错误,并且所有库都在同一目录中!):

错误连接器的插件类加载器:找不到“io.confluent.connect.elasticsearch.ElasticsearchSinkConnector”。返回:org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@5cc126dc (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)

我正在运行我的连接器:

bin/connect-standalone.sh config/connect-standalone.properties config/elasticsearch-connect.properties

简而言之,我的连接器 task.stateRUNNING一次只保留一个

编辑:
plugin.path对于 connect-standablone.properties 文件:

plugin.path=/media/***/projects/playground/kafka/kafka_2.12-2.4.0, /media/***/projects/playground/kafka/kafka-connect-elasticsearch/target/kafka-connect-elasticsearch-3.2.0-SNAPSHOT-package/share/java

它们都包含 es-connector jar.Last 一个稍后添加但仍然相同

我现在该怎么办?

标签: elasticsearchapache-kafkaapache-kafka-connect

解决方案


当我将 schema.enable 更改为 false 时,事情就像魅力一样

key.converter.schemas.enable=false
value.converter.schemas.enable=false

/并在plugin.path 没有它的情况下添加了额外的/源连接器!

编辑:我忘了提到我已经用 5.4.0 版本替换了我的连接器版本,因为 cricket_007 提到了

Edit-2: 我后来进行了更多调查,发现,额外的/问题以及下面提到的新关键属性帮助我摆脱了FAILED连接器的状态(一次只有一个连接器正在运行):

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

在 connect-standalone.properties 文件中

谢谢


推荐阅读