java - Datastax Kafka 连接器无法解析 Json 主题
问题描述
我有一个生产者类使用来自Github的自定义 JsonSerializer 发送到一个主题
public class JsonSerializer<T> implements Serializer<T> {
...
@Override
public byte[] serialize(String topic, T data) {
try {
return this.objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializationException(e);
}
}
...
}
我正在使用以下配置运行 Datastax Kafka 连接器:
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
连接器尝试使用主题时出现这些错误:
[2020-01-12 13:57:53,324] WARN Error inserting/updating row for Kafka record SinkRecord{kafkaOffset=416, timestampType=CreateTime} ConnectRecord{topic='test-3', kafkaPartition=17, key=null, keySchema=Schema{STRING}, value={}, valueSchema=null, timestamp=1578811437723, headers=ConnectHeaders(headers=)}: Primary key column(s) mmsi, ts cannot be left unmapped. Check that your mapping setting matches your dataset contents. (com.datastax.kafkaconnector.DseSinkTask:286)
从那个错误中,我认为连接器无法检索 Json 数据。我究竟做错了什么?
更新
我尝试了 Kafka JsonSerializer。
我尝试了 StringSerializer,因为连接器说它也受支持。
我发现一些数据实际上写入了数据库,但与 kafka 主题发送的总数据相比,它总是相对较小的数字。大约 5 到 10 个数据。
我试图保持连接器运行,我发现它写入失败后,它不会再写入了。
解决方案
实际上这是与配置相关的问题。正如我在更新中提到的,它不再写入数据以防出错。
这是因为 Datastax 具有ignoreErrors
那些具有默认值的配置false
。这意味着如果连接器在消息中发现错误,它将无限期地重试。我将其设置为true,问题就解决了。
推荐阅读
- ios - iOS手表应用程序音频在后台模式下不起作用
- python - 评论没有被删除
- opencv - 直接基于旋转R和平移T的立体校准,而不是使用点对应
- python - 在 python pandas 中创建新的日期列
- android - TextView wrap_content 在回收站视图项中不起作用
- sql - 自动创建 Sybase 用户
- excel - 使用 VBA 对随机表进行排序
- javascript - Google 表格和外部 API
- hyperledger-fabric - 在多通道多链码和多账本 Fabric 网络上访问账本
- java - 如何在构建时从 .class 文件中的 gradle.properties 获取值