java - kafka 流序列化错误:java.lang.String 无法转换为 [Byte],使用 json 数据时出现问题
问题描述
我是 kafka 流的新手,当我尝试对 json 数据进行聚合时遇到问题。下面是我的流代码,我复制了下面的代码和示例输入以及错误,我使用的是 kafka 版本:2.12。我已经尝试了几天的各种实现,现在陷入了这个错误。错误,无法继续。有人可以帮忙吗?
public static void main(String[] args) throws InterruptedException {
// TODO Auto-generated method stub
Properties props=new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "bank_stream_001");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
final Serializer<JsonNode> jsonSerialize=new JsonSerializer();
final Deserializer<JsonNode> jsonDeserialize= new JsonDeserializer();
final Serde<JsonNode> jsonSerd= Serdes.serdeFrom(jsonSerialize, jsonDeserialize);
String inputtopic = "bank_topic1";
String outputtopic = "bankout_topic1";
StreamsBuilder builder =new StreamsBuilder();
//stream for reading
KStream<String,JsonNode> stream1=builder.stream(inputtopic, Consumed.with(Serdes.String(), jsonSerd));
//create balance node for caluclations
ObjectNode initial_balance= JsonNodeFactory.instance.objectNode();
initial_balance.put("count",0);
initial_balance.put("balance", 0);
initial_balance.put("time", Instant.ofEpochMilli(0L).toString());
stream1.peek((k,v) -> System.out.println(k+" : "+v));
KTable<String, JsonNode> bank_bal= stream1.groupByKey()
.aggregate(
() -> initial_balance,
(key,value,cur_balance) -> newbalance(value,cur_balance)
);
bank_bal.toStream().peek((k,v) -> System.out.println(k+" : "+v));
bank_bal.toStream().to(outputtopic, Produced.with(Serdes.String(), jsonSerd));
KafkaStreams streams = new KafkaStreams(builder.build(),props);
streams.cleanUp();
streams.start();
System.out.println(stream1.toString());
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
private static JsonNode newbalance(JsonNode transaction,JsonNode balance) {
ObjectNode newbal = JsonNodeFactory.instance.objectNode();
newbal.put("count", balance.get("count").asInt()+1);
newbal.put("balance", balance.get("balance").asInt()+transaction.get("amount").asInt());
Long balanceEpoch= Instant.parse(balance.get("time").asText()).toEpochMilli();
Long transEpoch =Instant.parse(transaction.get("time").asText()).toEpochMilli();
Instant newbalanceinstant = Instant.ofEpochMilli(Math.max(balanceEpoch, transEpoch));
newbal.put("time", newbalanceinstant.toString());
return newbal;
}
我在主题中的数据:bank_topic1 是
aruna {"name":"aruna","amount":58,"time":"2021-02-26T15:12:43.811Z"}
varun {"name":"varun","amount":3,"time":"2021-02-26T15:12:45.081Z"}
kali {"name":"kali","amount":16,"time":"2021-02-26T15:12:46.082Z"}
aruna {"name":"aruna","amount":80,"time":"2021-02-26T15:13:32.806Z"}
varun {"name":"varun","amount":33,"time":"2021-02-26T15:13:34.015Z"}
kali {"name":"kali","amount":30,"time":"2021-02-26T15:13:35.016Z"}
由于serde,我收到错误。
java.lang.ClassCastException: java.lang.String cannot be cast to [B at
org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
你能帮忙吗?
解决方案
推荐阅读
- ms-access - 如何编写查询以获得我想要的结果?
- azure - 在 Azure Powershell 函数中使用 Azure CLI
- twig - Twig 在内部使用 htmlspecialchars 进行转义。我如何通过 ENT_NOQUOTES?
- git - 带有 git 哈希对象的 git ls-tree
- javascript - 无法安装 npm 包 - bcrypt@3.0.6 install: `node-pre-gyp install --fallback-to-build` in Ubuntu 18.04
- python-3.x - 如何在此 html 代码中使用 BeautifulSoup 获取文本:c#
- ios - 选择添加到 SwiftUI 列表的新项目
- api - 如何使用 Filenet 内容引擎中的文档 ID 从文档中检索多个图像
- linux - git diff --numstat 通过字符串而不是文件路径
- google-chrome-devtools - 使用 Puppeteer,如何获取 Chrome DevTools 的“网络”选项卡的计时信息?