elasticsearch - Kafka 连接 ElasticSearch 接收器 - 使用 if-else 块来提取和转换不同主题的字段
问题描述
我有一个 kafka es sink 属性文件,如下所示
name=elasticsearch.sink.direct
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=16
topics=data.my_setting
connection.url=http://dev-elastic-search01:9200
type.name=logs
topic.index.map=data.my_setting:direct_my_setting_index
batch.size=2048
max.buffered.records=32768
flush.timeout.ms=60000
max.retries=10
retry.backoff.ms=1000
schema.ignore=true
transforms=InsertKey,ExtractId
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=MY_SETTING_ID
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=MY_SETTING_ID
这非常适用于单个主题(data.my_setting)。我想对来自多个主题的数据使用相同的连接器。不同主题中的消息将具有不同的键,我需要对其进行转换。我想知道是否有一种方法可以使用带有主题名称或消息中单个字段条件的 if else 语句,以便我可以然后以不同的方式转换密钥。所有传入的消息都是带有模式和有效负载的 json。
更新基于答案:
在我的 jdbc 连接器中,我按如下方式添加密钥:
name=data.my_setting
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
poll.interval.ms=500
tasks.max=4
mode=timestamp
query=SELECT * FROM MY_TABLE with (nolock)
timestamp.column.name=LAST_MOD_DATE
topic.prefix=investment.ed.data.app_setting
transforms=ValueToKey
transforms.ValueToKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.ValueToKey.fields=MY_SETTING_ID
但是,当弹性搜索接收器读取从此连接器生成的消息时,我仍然会收到错误消息
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
Caused by: org.apache.kafka.connect.errors.DataException: STRUCT is not supported as the document id
The payload looks like this:
{
"schema": {
"type": "struct",
"fields": [{
"type": "int32",
"optional": false,
"field": "MY_SETTING_ID"
}, {
"type": "string",
"optional": true,
"field": "MY_SETTING_NAME"
}
],
"optional": false
},
"payload": {
"MY_SETTING_ID": 9,
"MY_SETTING_NAME": "setting_name"
}
}
Connect standalone property file looks like this:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/apps/{env}/logs/infrastructure/offsets/connect.offsets
rest.port=8084
plugin.path=/usr/share/java
Is there a way to achieve my goal which is to have messages from multiple topics (in my case db tables) which will have their own unique ids (which will also be the id of a document in ES) be sent to a single ES sink.
Can I use avro for this task. Is there a way to define the key in schema registry or will I run into the same problem?
解决方案
This isn't possible. You'd need multiple Connectors if the key fields are different.
One option to think about is pre-processing your Kafka topics through a stream processor (e.g. Kafka Streams, KSQL, Spark Streaming etc etc) to standardise the key fields, so that you can then use a single connector. It depends what you're building as to whether this would be worth doing, or overkill.
推荐阅读
- python - Websraping 在付费墙后面,多页
- php - 添加功能后WP重定向不起作用
- roomle - 配置器左下角的文字描述可以改成什么都不显示吗?
- java - Hazelcast 滚动更新
- flutter - 引发了另一个异常:在小部件树中检测到重复的 GlobalKey
- mysql - 如何创建带有子列的 MySQL 表?
- pip - ModuleNotFoundError:没有名为“talib”的模块
- javascript - 如何创建一个程序来计算在 1 到 100 之间可被 3 和 4 整除的数字的计数?
- reactjs - 如何在反应钩子中的渲染器内添加错误/验证消息
- flutter - Flutter 2 - 如何在新的 TextButton 中制作圆角并赋予它一个高度