apache-kafka-streams - 聚合上使用了错误的序列化程序
问题描述
我正在处理一个处理日志事件的 kafka-streams 应用程序。在这种情况下,我想将 WorkflowInput 类型聚合为 Workflow 类型。我在使聚合工作时遇到问题。
final KStream<String, WorkflowInput> filteredStream = someStream;
final KTable<String, Workflow> aggregatedWorkflows = filteredStream
.peek((k, v) -> {
if (!(v instanceof WorkflowInput)) {
throw new AssertionError("Type not expected");
}
})
.groupByKey()
.<Workflow>aggregate(Workflow::new, (k, input, workflow) -> workflow.updateFrom(input),
Materialized.<String, Workflow, KeyValueStore<Bytes, byte[]>>as("worflow-cache")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.serdeFrom(new JsonSerializer<Workflow>(), new JsonDeserializer<Workflow>(Workflow.class)));
我得到以下异常:由以下原因引起:org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: workflowauditstreamer.WorkflowInput).
有两件事需要注意: * 值序列化器是一个 StringSerializer,而我使用withValueSerde
. * 实际值类型是WorkflowInput
我所期望的Workflow
,因为那是我的聚合值类型。
我是 kafka-streams 的新手,所以我可能会遗漏一些明显的东西,但我无法弄清楚。我在这里想念什么?
解决方案
如果您Serde
从配置中覆盖默认值,则它在操作员就地覆盖。它不会向下游传播(Kafka 2.0——有 WIP 来改进它)。
因此,您也需要将Serde
您使用的 s传递someStream = builder.stream(...)
到.groupByKey(Serialized.with(...))
中。
推荐阅读
- python - 有没有比 np.diff 更快的替代方案?
- r - R googleway:在地图上添加标题?
- java - 如何创建像 Clipboard Pro App 这样的浮动窗口?
- vue.js - 我可以在 bootstrap-vue 项目中安全地包含 bootstrap.js 吗?
- c - 将指向结构的指针作为参数传递
- rest - 是否有来自官方文档的 sharepoint online rest api 的完整列表?
- jenkins - 如何使用作业 dsl 创建一个运行 groovy 代码的作业?
- python - 如何浏览到带有情节破折号的文件夹?
- angular - 为什么页脚组件在其他组件之前加载?
- oracle - 我正在为学生注册系统制作 PL/SQL 触发器,但有一个可怕的问题