spring - "WindowedBy Count KStream" throws StreamsException
问题描述
I tried to count event from KStream, into time period:
KStream<String, VehicleEventTO> stream = builder.stream("vehicle", Consumed.with(Serdes.String(), new JsonSerde<>(VehicleEventTO.class)));
KStream<String, VehicleEventTO> streamWithKey = stream.selectKey((key, value) -> value.getId_vehicle().toString());
KStream<String, Long> streamValueKey = streamWithKey.map((key, value) -> KeyValue.pair(key, value.getId_vehicle()));
streamValueKey.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(10).toMillis()))
.count(Materialized.with(Serdes.String(), new JsonSerde<>(Long.class)));
I've this exception:
Exception in thread "test-app-87ce164d-c427-4dcf-aa76-aeeb6f8fc943-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=vehicle, partition=0, offset=160385 at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:318) at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409) at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:964) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736) Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.Long). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
解决方案
groupByKey()
使用默认的序列化器:
groupByKey()
按当前键将记录分组到 KGroupedStream 中,同时保留原始值和默认序列化器和反序列化器。
您必须使用groupByKey(Serialized<K,V> serialized)
或groupByKey(Grouped<K,V> grouped)
。
以下应该可以解决问题:
streamValueKey.groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(10).toMillis()))
.count(Materialized.with(Serdes.String(), new JsonSerde<>(Long.class)));
推荐阅读
- android - 用文本区域填充容器
- python - 阻止小部件相互重叠并在它们之间添加空间
- node.js - 节点提供的 ejs 文件可以使用离线引导程序吗?
- python - 我可以使用 pynput 将变量插入键盘按下吗?
- java - WildFly 11 以编程方式更改日志级别
- swift - 将操作分配给 UITextField 并更新 UILabel
- jasmine - 保护器或茉莉花中的 onprepare 和 onComplete 是什么?
- react-native - 错误:捆绑失败:ReferenceError:模块未在图中注册
- ios - 无法满足 UILabel 对动态 UITableViewCell 的 ContentView 的底部约束
- python - Python dlib - 读取图像而不是网络摄像头