首页 > 解决方案 > 无法使用 Kafka connect json api 使用 Json 消息

问题描述

我使用 kafka 的代码,它没有打印任何输出

我正在尝试通过 kafka 使用 json 消息,但没有提供任何输入,这是我的输入:{"opp_id":"2","opp_tenant_id":"3","createdWhen":"4"} 我能够接收作为字符串反序列化器,但不使用 Jsondeserializer

非常感谢任何帮助,谢谢。

ObjectMapper mapper = new ObjectMapper();

KafkaConsumer<String, JsonNode> consumer = new KafkaConsumer<String, JsonNode>(props);
    try {
    while (true) {
        ConsumerRecords<String, JsonNode> records = consumer.poll(100); 
        for (ConsumerRecord<String, JsonNode> record : records) {               
            JsonNode jsonNode = record.value();
       System.out.println(mapper.treeToValue(jsonNode,Contact.class));
        }
            }
    }

波乔,

public class Contact {
    private String opp_id;
    private String opp_tenant_id;
    private String createdWhen;

public Contact(String opp_id,String opp_tenant_id, String createdWhen)
{
    this.opp_id =opp_id;
    this.opp_tenant_id = opp_tenant_id;
    this.createdWhen = createdWhen;

}

public void parseString(String csvStr){
    StringTokenizer st = new StringTokenizer(csvStr,",");
    opp_id = st.nextToken();
    opp_tenant_id = st.nextToken();
    createdWhen = st.nextToken();
}

public Contact() {}

public String getopp_id(){
    return opp_id;
}
public Contact setopp_id(String opp_id) {
    this.opp_id=opp_id;
    return this;
}
public String getopp_tenant_id(){
    return opp_tenant_id;
}
public Contact setopp_tenant_id(String opp_tenant_id) {
    this.opp_tenant_id=opp_tenant_id;
    return this;
}

public String getcreatedWhen(){
    return createdWhen;
}
public Contact setcreatedWhen(String createdWhen) {
    this.createdWhen=createdWhen;
    return this;
}

标签: javaapache-kafka

解决方案


Apache kafka 没有任何可用的反序列化JsonDeserializer类,并且所有可用的反序列化类都在 apache kafka 中ByteArrayDeserializer, IntegerDeserializer, LongDeserializer, StringDeserializer(有关更多信息here),因此您可以使用StringDeserializer以字符串 JSON 格式读取消息并用于Object mapper将 JSON 字符串转换为 POJO 类,或者如果您想使用JsonDeserializer更喜欢 Spring卡夫卡


推荐阅读