elasticsearch - 在 kafka-connect 接收器中提取字段和解析 JSON
问题描述
我有一个 mongodb->kafka connect->elasticsearch 的 kafka-connect 流程,可以端到端地发送数据,但有效负载文档是 JSON 编码的。这是我的源 mongodb 文档。
{
"_id": "1541527535911",
"enabled": true,
"price": 15.99,
"style": {
"color": "blue"
},
"tags": [
"shirt",
"summer"
]
}
这是我的 mongodb 源连接器配置:
{
"name": "redacted",
"config": {
"connector.class": "com.teambition.kafka.connect.mongo.source.MongoSourceConnector",
"databases": "redacted.redacted",
"initial.import": "true",
"topic.prefix": "redacted",
"tasks.max": "8",
"batch.size": "1",
"key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer": "org.apache.kafka.common.serialization.JSONSerializer",
"key.serializer.schemas.enable": false,
"value.serializer.schemas.enable": false,
"compression.type": "none",
"mongo.uri": "mongodb://redacted:27017/redacted",
"analyze.schema": false,
"schema.name": "__unused__",
"transforms": "RenameTopic",
"transforms.RenameTopic.type":
"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.RenameTopic.regex": "redacted.redacted_Redacted",
"transforms.RenameTopic.replacement": "redacted"
}
}
在 elasticsearch 中,它最终看起来像这样:
{
"_index" : "redacted",
"_type" : "kafka-connect",
"_id" : "{\"schema\":{\"type\":\"string\",\"optional\":true},\"payload\":\"1541527535911\"}",
"_score" : 1.0,
"_source" : {
"ts" : 1541527536,
"inc" : 2,
"id" : "1541527535911",
"database" : "redacted",
"op" : "i",
"object" : "{ \"_id\" : \"1541527535911\", \"price\" : 15.99,
\"enabled\" : true, \"tags\" : [\"shirt\", \"summer\"],
\"style\" : { \"color\" : \"blue\" } }"
}
}
我想使用 2 个单消息转换:
ExtractField
抓取object
,这是一串JSON- 将 JSON 解析为对象或让普通的 JSONConverter 处理它的东西,只要它最终在弹性搜索中结构正确。
我尝试仅ExtractField
在我的接收器配置中执行此操作,但我看到 kafka 记录了此错误
kafka-connect_1 | org.apache.kafka.connect.errors.ConnectException:
Bulk request failed: [{"type":"mapper_parsing_exception",
"reason":"failed to parse",
"caused_by":{"type":"not_x_content_exception",
"reason":"Compressor detection can only be called on some xcontent bytes or
compressed xcontent bytes"}}]
这是我的弹性搜索接收器连接器配置。在这个版本中,我的工作正常,但我必须编写自定义 ParseJson SMT。它运行良好,但如果有更好的方法或方法可以结合一些内置的东西(转换器、SMT、任何工作)来做到这一点,我很乐意看到这一点。
{
"name": "redacted",
"config": {
"connector.class":
"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"batch.size": 1,
"connection.url": "http://redacted:9200",
"key.converter.schemas.enable": true,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"schema.ignore": true,
"tasks.max": "1",
"topics": "redacted",
"transforms": "ExtractFieldPayload,ExtractFieldObject,ParseJson,ReplaceId",
"transforms.ExtractFieldPayload.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractFieldPayload.field": "payload",
"transforms.ExtractFieldObject.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractFieldObject.field": "object",
"transforms.ParseJson.type": "reaction.kafka.connect.transforms.ParseJson",
"transforms.ReplaceId.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.ReplaceId.renames": "_id:id",
"type.name": "kafka-connect",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
}
解决方案
我不确定你的 Mongo 连接器。我不认识类或配置...大多数人可能使用Debezium Mongo 连接器
不过,我会这样设置
"connector.class": "com.teambition.kafka.connect.mongo.source.MongoSourceConnector",
"key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer": "org.apache.kafka.common.serialization.JSONSerializer",
"key.serializer.schemas.enable": false,
"value.serializer.schemas.enable": true,
这schemas.enable
很重要,这样内部 Connect 数据类就可以知道如何转换为其他格式/从其他格式转换。
然后,在 Sink 中,您再次需要使用 JSON De Serializer(通过转换器),以便它创建一个完整的对象而不是纯文本字符串,正如您在 Elasticsearch ( {\"schema\":{\"type\":\"string\"
) 中看到的那样。
"connector.class":
"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": true
如果这不起作用,那么您可能必须提前在 Elasticsearch 中手动创建索引映射,以便它知道如何实际解析您发送给它的字符串
推荐阅读
- php - phpoffice/电子表格如何在图表中为每一列添加一个值
- node.js - 使用 Express 在 Node.js 中返回调用不起作用
- python - 使用列表中的索引值从大型数据帧创建较小的数据帧
- javascript - TypeScript 枚举到特定对象
- php - 无法打开 PHP,dyld:库未加载
- spring-boot - Spring-Kafka 似乎阻止 Spring Boot web 正常运行
- ibm-appid - 我无法在 Kubernetes 上安装 app-identity-and-access-adapter
- c++ - 多个可变参数函数的单个模板参数包?
- java - 使用 java RMI 传递复合对象
- c# - .NET Benchmark 中的 runtimeconfig.json 无效