首页 > 解决方案 > Kafka流中的映射操作后双引号中的字符串

问题描述

我使用 Kafka 流 DSL 和映射将 a 转换KStream<String, JsonNode>KStream<String, String>.

ValueMapper函数中,我简单地 return new ValueMapper("key", "some constant string"),但是在将值发送回 Kafka 使用的地方KStream.to("some topic"),我得到的结果是添加了双引号。

我的代码是这样的:

KStream<String, JsonNode> views = builder.stream("fromTopic");
views.map(new ValueKMapper()).to("toTopic");

并且ValueMapper简单地实现KeyValueMapper并且代码apply()是:

public KeyValue<String, String> apply(String key, JsonNode value) 
{
    return new KeyValue<String,String>("a", "hello");
}

然后当我使用 , 时toTopic,我得到了""hello"", 添加了引号。

也许这是Kafka流的错误?

标签: apache-kafkaapache-kafka-streams

解决方案


我假设apply()您在问题中提供的方法

public KeyValue<String, String> apply(String key, JsonNode value) 
{
    return new KeyValue<String,String>("a", "hello");
}

没有将硬编码的键和值传递给KeyValue的构造函数。我的猜测是这个问题与JsonNode. 也许,您的方法的实际实现使用value.get(key)

public KeyValue<String, String> apply(String key, JsonNode value) 
{
    return new KeyValue<String,String>(key, value.get(key));
}

但是,value.get(key)将返回 aTextNode并且该方法将返回包含引号toString()的字符串表示形式。TextNode为了JsonNode正确解析,您需要使用textValue()方法,因此您的方法将变为

public KeyValue<String, String> apply(String key, JsonNode value) 
{
    return new KeyValue<String,String>(key, value.get(key).textValue());
}

示例: 假设您有一个 keya和一个 value hello

json.get("a").toString())

将返回"hello"

json.get("a").textValue();

将返回hello


推荐阅读