首页 > 解决方案 > Datastax Kafka 连接器无法解析 Json 主题

问题描述

我有一个生产者类使用来自Github的自定义 JsonSerializer 发送到一个主题

public class JsonSerializer<T> implements Serializer<T> {
    ...
    @Override
    public byte[] serialize(String topic, T data) {
        try {
            return this.objectMapper.writeValueAsBytes(data);
        } catch (JsonProcessingException e) {
            throw new SerializationException(e);
        }
    }
    ...
}

我正在使用以下配置运行 Datastax Kafka 连接器:

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

连接器尝试使用主题时出现这些错误:

[2020-01-12 13:57:53,324] WARN Error inserting/updating row for Kafka record SinkRecord{kafkaOffset=416, timestampType=CreateTime} ConnectRecord{topic='test-3', kafkaPartition=17, key=null, keySchema=Schema{STRING}, value={}, valueSchema=null, timestamp=1578811437723, headers=ConnectHeaders(headers=)}: Primary key column(s) mmsi, ts cannot be left unmapped. Check that your mapping setting matches your dataset contents. (com.datastax.kafkaconnector.DseSinkTask:286)

从那个错误中,我认为连接器无法检索 Json 数据。我究竟做错了什么?

更新

我尝试了 Kafka JsonSerializer。

我尝试了 StringSerializer,因为连接器说它也受支持。

我发现一些数据实际上写入了数据库,但与 kafka 主题发送的总数据相比,它总是相对较小的数字。大约 5 到 10 个数据。

我试图保持连接器运行,我发现它写入失败后,它不会再写入了。

标签: javaapache-kafkacassandradatastaxapache-kafka-connect

解决方案


实际上这是与配置相关的问题。正如我在更新中提到的,它不再写入数据以防出错。

这是因为 Datastax 具有ignoreErrors那些具有默认值的配置false。这意味着如果连接器在消息中发现错误,它将无限期地重试。我将其设置为true,问题就解决了。


推荐阅读