首页 > 解决方案 > 将数据从 Kafka 流式传输到 MongDB 时如何为非 json 格式消息配置接收器连接器

问题描述

我可以将数据从 Kafka 流式传输到 MongoDB。

kafka-console-producer.sh使用 json 消息流式传输(例如:){"id": 1, "test": 123}。它已成功插入 MongoDB。

但是,如果我流式传输消息(例如:)abc。然后,我会因为json解析而得到错误

[2019-07-09 14:44:30,365] ERROR WorkerSinkTask{id=mongo-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:344)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'abc': was expecting ('true', 'false
' or 'null')
 at [Source: (byte[])"abc"; line: 1, column: 7]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'abc': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"abc"; line: 1, column: 7]
        at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:703)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3532)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2627)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:832)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:729)
        at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4042)
        at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2571)
        at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:342)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2019-07-09 14:44:30,374] ERROR WorkerSinkTask{id=mongo-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.Wor
kerTask)

接收器连接器配置文件MongoSinkConnector.properties包含内容(我基于此创建它)

name=mongo-sink
topics=test
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
key.ignore=true

# Specific global MongoDB Sink Connector configuration
connection.uri=mongodb://localhost:27017
database=test_kafka
collection=transaction
max.num.retries=3
retries.defer.timeout=5000
type.name=kafka-connect

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

我有 2 个单独的问题:

  1. 如何忽略非json格式的消息?

  2. 如何为这种消息定义一个默认键(例如:abc->{ "non-json": "abc" }

标签: mongodbapache-kafkaapache-kafka-connect

解决方案


字符串是“JSON 格式”,但是,JsonConverter通常需要 JSON对象,或者在某些情况下null,不是纯字符串。

没有为错误设置默认值的属性,但是您可以简单地跳过事件,或者将它们发送到不同的主题,如果您希望它们也最终进入数据库,则必须单独处理和修复消息。这是使用errors.tolerance设置配置的(仅自 Kafka Connect 2.0 起可用)

参考 - https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues

如果数据只是"abc",那么我建议使用StringConverter而不是JsonConverter无论如何

但是,如果您想将所有事件包装在 JSON 对象中(而不仅仅是纯字符串),则必须添加transforms属性

transforms=MakeMap 
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value 
transforms.MakeMap.field=non-json 

这需要"abc"to{"non-json":"abc"} {"id": 1, "test": 123}to {"non-json":{"id": 1, "test": 123}},所以你需要小心你在主题中产生的内容。


推荐阅读