首页 > 解决方案 > 在 Kafka Streams 中序列化 HashMap

问题描述

我正在尝试序列化HashMap<String, String>哪些 DTO 属性最终会出现在 ElasticSearch 索引中。

我有这样的课:

class MyDto {
    HashMap<String, String> properties;
}

我希望所有属性都直接_source位于 ES 下:

_source: {
    "firstPropKey": "firstValue",
    "secondPropKey": "secondVal"
    ...
}

而不是这个:

_source: {
    properties: {
       "firstPropKey": "firstValue",
       "secondPropKey": "secondVal"
       ...
    }
}

我尝试了各种实现,但无法使其工作。这是一种方法:

final StreamsBuilder builder = new StreamsBuilder();

builder.stream(dataTopicName, Consumed.with(Serdes.String(), dtoSerde))
    .map((key, myDTO) -> new KeyValue<>(id, myDTO.properties))
    .to(elasticIndexTopicName, Produced.with(Serdes.String(), dtoPropsSerde));

问题是它dto.propertiesHashMap参数化<String, String>并且Jackson2Serde不允许参数化类,所以我需要让它像这样:

private final Jackson2Serde<HashMap> dtoPropsSerde = new Jackson2Serde<>(mapper, HashMap.class);

但后来我得到类型错误,更具体地说:Cannot resolve method to(String, Produced<K,V>)

如果我制作dto.properties旧的未参数化 HashMap,仍然无法正常工作。在我的日志中找不到任何有用的东西。我错过了什么?

编辑:值得注意的是,我在elasticIndexTopicNameElasticSearch sink connect 正在阅读的主题中没有得到任何信息。

标签: javahashmapjacksonapache-kafka-streams

解决方案


推荐阅读