java - 在 Kafka Streams 中序列化 HashMap
问题描述
我正在尝试序列化HashMap<String, String>
哪些 DTO 属性最终会出现在 ElasticSearch 索引中。
我有这样的课:
class MyDto {
HashMap<String, String> properties;
}
我希望所有属性都直接_source
位于 ES 下:
_source: {
"firstPropKey": "firstValue",
"secondPropKey": "secondVal"
...
}
而不是这个:
_source: {
properties: {
"firstPropKey": "firstValue",
"secondPropKey": "secondVal"
...
}
}
我尝试了各种实现,但无法使其工作。这是一种方法:
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(dataTopicName, Consumed.with(Serdes.String(), dtoSerde))
.map((key, myDTO) -> new KeyValue<>(id, myDTO.properties))
.to(elasticIndexTopicName, Produced.with(Serdes.String(), dtoPropsSerde));
问题是它dto.properties
被HashMap
参数化<String, String>
并且Jackson2Serde
不允许参数化类,所以我需要让它像这样:
private final Jackson2Serde<HashMap> dtoPropsSerde = new Jackson2Serde<>(mapper, HashMap.class);
但后来我得到类型错误,更具体地说:Cannot resolve method to(String, Produced<K,V>)
如果我制作dto.properties
旧的未参数化 HashMap,仍然无法正常工作。在我的日志中找不到任何有用的东西。我错过了什么?
编辑:值得注意的是,我在elasticIndexTopicName
ElasticSearch sink connect 正在阅读的主题中没有得到任何信息。
解决方案
推荐阅读
- c# - 在 C# 中通过 TCP 套接字方法发送位类型数据
- java - AES 解密期间出错。非法块大小异常
- android - 关闭警报对话框
- wordpress - 复杂重定向的组合或重写规则
- usb - Windows 7 虚拟机无法识别具有最新 Virtualbox 扩展的 USB 密钥和该密钥的过滤器
- android - 问题在 Honor 6x 中不可见项目(带有 StaggeredGridLayoutManager 的 Recyclerview)
- android - Google Apps 脚本:将数据附加到电子表格(如果尚未存在)
- python - 井字游戏python的GUI逻辑
- python - 使用 GPU Nvidia 的张量流
- django - Django - 从用户名获取用户 ID