首页 > 解决方案 > Kafka 连接 ElasticSearch 接收器 - 使用 if-else 块来提取和转换不同主题的字段

问题描述

我有一个 kafka es sink 属性文件,如下所示

name=elasticsearch.sink.direct
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=16
topics=data.my_setting

connection.url=http://dev-elastic-search01:9200
type.name=logs
topic.index.map=data.my_setting:direct_my_setting_index
batch.size=2048
max.buffered.records=32768
flush.timeout.ms=60000
max.retries=10
retry.backoff.ms=1000
schema.ignore=true
transforms=InsertKey,ExtractId
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=MY_SETTING_ID
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=MY_SETTING_ID

这非常适用于单个主题(data.my_setting)。我想对来自多个主题的数据使用相同的连接器。不同主题中的消息将具有不同的键,我需要对其进行转换。我想知道是否有一种方法可以使用带有主题名称或消息中单个字段条件的 if else 语句,以便我可以然后以不同的方式转换密钥。所有传入的消息都是带有模式和有效负载的 json。

更新基于答案

在我的 jdbc 连接器中,我按如下方式添加密钥:

name=data.my_setting
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
poll.interval.ms=500
tasks.max=4
mode=timestamp
query=SELECT * FROM MY_TABLE with (nolock)
timestamp.column.name=LAST_MOD_DATE
topic.prefix=investment.ed.data.app_setting

transforms=ValueToKey
transforms.ValueToKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.ValueToKey.fields=MY_SETTING_ID

但是,当弹性搜索接收器读取从此连接器生成的消息时,我仍然会收到错误消息

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
Caused by: org.apache.kafka.connect.errors.DataException: STRUCT is not supported as the document id

The payload looks like this:

{
"schema": {
    "type": "struct",
    "fields": [{
            "type": "int32",
            "optional": false,
            "field": "MY_SETTING_ID"
        }, {
            "type": "string",
            "optional": true,
            "field": "MY_SETTING_NAME"
        }
    ],
    "optional": false
},
"payload": {
    "MY_SETTING_ID": 9,
    "MY_SETTING_NAME": "setting_name"
}
}

Connect standalone property file looks like this:

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter 
value.converter=org.apache.kafka.connect.json.JsonConverter 
converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter 
internal.value.converter=org.apache.kafka.connect.json.JsonConverter 
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/apps/{env}/logs/infrastructure/offsets/connect.offsets
rest.port=8084
plugin.path=/usr/share/java

Is there a way to achieve my goal which is to have messages from multiple topics (in my case db tables) which will have their own unique ids (which will also be the id of a document in ES) be sent to a single ES sink.

Can I use avro for this task. Is there a way to define the key in schema registry or will I run into the same problem?

标签: elasticsearchapache-kafka-connectconfluent-platform

解决方案


This isn't possible. You'd need multiple Connectors if the key fields are different.

One option to think about is pre-processing your Kafka topics through a stream processor (e.g. Kafka Streams, KSQL, Spark Streaming etc etc) to standardise the key fields, so that you can then use a single connector. It depends what you're building as to whether this would be worth doing, or overkill.


推荐阅读