java - 无法使用 Kafka connect json api 使用 Json 消息
问题描述
我使用 kafka 的代码,它没有打印任何输出
我正在尝试通过 kafka 使用 json 消息,但没有提供任何输入,这是我的输入:{"opp_id":"2","opp_tenant_id":"3","createdWhen":"4"} 我能够接收作为字符串反序列化器,但不使用 Jsondeserializer
非常感谢任何帮助,谢谢。
ObjectMapper mapper = new ObjectMapper();
KafkaConsumer<String, JsonNode> consumer = new KafkaConsumer<String, JsonNode>(props);
try {
while (true) {
ConsumerRecords<String, JsonNode> records = consumer.poll(100);
for (ConsumerRecord<String, JsonNode> record : records) {
JsonNode jsonNode = record.value();
System.out.println(mapper.treeToValue(jsonNode,Contact.class));
}
}
}
波乔,
public class Contact {
private String opp_id;
private String opp_tenant_id;
private String createdWhen;
public Contact(String opp_id,String opp_tenant_id, String createdWhen)
{
this.opp_id =opp_id;
this.opp_tenant_id = opp_tenant_id;
this.createdWhen = createdWhen;
}
public void parseString(String csvStr){
StringTokenizer st = new StringTokenizer(csvStr,",");
opp_id = st.nextToken();
opp_tenant_id = st.nextToken();
createdWhen = st.nextToken();
}
public Contact() {}
public String getopp_id(){
return opp_id;
}
public Contact setopp_id(String opp_id) {
this.opp_id=opp_id;
return this;
}
public String getopp_tenant_id(){
return opp_tenant_id;
}
public Contact setopp_tenant_id(String opp_tenant_id) {
this.opp_tenant_id=opp_tenant_id;
return this;
}
public String getcreatedWhen(){
return createdWhen;
}
public Contact setcreatedWhen(String createdWhen) {
this.createdWhen=createdWhen;
return this;
}
解决方案
Apache kafka 没有任何可用的反序列化JsonDeserializer
类,并且所有可用的反序列化类都在 apache kafka 中ByteArrayDeserializer, IntegerDeserializer, LongDeserializer, StringDeserializer
(有关更多信息here),因此您可以使用StringDeserializer
以字符串 JSON 格式读取消息并用于Object mapper
将 JSON 字符串转换为 POJO 类,或者如果您想使用JsonDeserializer
更喜欢 Spring卡夫卡
推荐阅读
- html - 悬停时更改项目位置
- python - Python serial.write()不适用于NodeMCU
- octave - gnu八度中gnu的含义?
- html - 设置默认值
基于列表的布尔条件 - networking - 使用 url 单独调节流量
- arrays - 如何添加从 c 中的文件编号读取的二维数组组?
- sql - 加入 2 个不同大小的数组
- typescript - 如何访问 `setup` 的内部状态而不通过`data` 暴露它?
- c# - 无法通过实体框架查询 SQL Server“没有为此 DbContext 配置数据库提供程序”
- apache-spark - 将具有 oracle blob 列的 spark-sql 转换为 java byte[] 的问题