serialization - 运行 Streams 应用程序时 Serdes 中的序列化错误
问题描述
我正在编写一个应用程序来处理传入的坐标记录并监视它们在定义的地理围栏中的进入或退出。中间,我使用聚合来维持当前坐标是否在特定地理围栏内部/外部的状态。
以下是运行此错误时的堆栈跟踪:
Exception in thread "geofence-events-990be707-d44d-410d-a8a7-b2a3b90cb84f-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000003, topic=vehicle-sensor-data, partition=0, offset=0
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:367)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.loconav.GeoFenceDTO). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:73)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:351)
... 5 more
Caused by: java.lang.ClassCastException: class com.loconav.GeoFenceDTO cannot be cast to class java.lang.String (com.loconav.GeoFenceDTO is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:157)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:101)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 45 more
我在 flatMapValues 之后写了一个偷看,第一条记录被打印出来,然后发生了这个异常。下面是拓扑代码
vehicleGeoFences
.flatMapValues(
(readOnlyKey, value) -> {
List<GeoFenceDTO> geoFenceDTOS = new ArrayList<>();
// some logic
return geoFenceDTOS;
})
.groupBy((key, value) -> value.getGeofenceId() + "," + value.getVehicleId())
.aggregate(
GeoFenceDTO::new,
(key, value, aggregate) -> {
//setting values
return geofenceDTO;
},
Materialized.with(Serdes.String(), getGeofenceDTOValueSerde()))
.toStream()
.mapValues(
(readOnlyKey, value) -> {
// changing object
})
.to(
Topics.VEHICLE_EVENTS.name(),
Produced.with(vehicleEventsKeySerde, vehicleEventsValueSerde));
我在 GeofenceDTO 上编写了一个自定义 Serde,而不是使用 jackson ObjectMapper 将对象转换为字节,反之亦然。不知何故,它看起来没有选择 Materialized 中提供的参数。应用程序的默认键值 Serde 是 StringSerde。
帮助表示赞赏。TIA。我尝试将自定义对象更改为生成的 Avro,但同样的错误仍然存在。
解决方案
推荐阅读
- java - 如何在 JTabbedPane 中更改选项卡时停止编辑 JTable 中的 JCombobox
- java - 在 TomEE 中使用 EJB 3.1 的推土机
- javascript - 鼠标悬停时停止横幅
- python - 无法通过 WMI 获取用于 Open Hardware Monitor 的传感器
- java - Arrayadapter 显示元素(不重复)在运行时出现的次数
- azure - Azure DevOps 是否可以获取每个用户的“克隆”、“分叉”和“下载为 ZIP”的历史记录?
- angular - 访问 Tabulator 以前使用 Angular 创建的表
- timescaledb - 是否可以从 Timescale 中的双时态表访问审计数据
- vb.net - VB.net yyyymmdd 格式需要解析为日期
- junit - 如何在为 Spring-ws 端点编写 Junit 时模拟服务类