java - 将 Kafka Streams 与依赖于标头中的模式引用的 Serdes 一起使用
问题描述
我正在尝试使用 Kafka Streams 对 CDC 数据执行 KTable-KTable 外键连接。我将读取的数据是 Avro 格式,但是它的序列化方式与其他行业序列化器/反序列化器(例如 Confluent 模式注册表)不兼容,因为模式标识符存储在标头中。
当我设置我的 KTables 的 Serdes 时,我的 Kafka Streams 应用程序最初运行,但最终失败,因为它在内部调用 Serializer 方法byte[] serialize(String topic, T data);
而不是带有标题的方法(即byte[] serialize(String topic, Headers headers, T data)
在包装序列化器 ValueAndTimestampSerializer 中。我正在使用的 Serdes 无法处理这并抛出异常。
第一个问题是,有没有人知道一种方法来恳求 Kafka Streams 在内部调用具有正确方法签名的方法?
我正在探索解决此问题的方法,包括编写使用消息本身中的模式标识符重新序列化的新 Serdes。这可能涉及将数据重新复制到新主题或使用拦截器。
但是,我知道可以ValueTransformer
访问. 这个想法是首先将值读取为 a然后将值反序列化到转换器中的 Avro 类(参见下面的示例)。但是,当我这样做时,我遇到了一个例外。ProcessorContext
transformValues()
byte[]
StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, MySpecificClass> myTable = builder.table(
"my-topic",
Consumed.with(Serdes.Long(), Serdes.ByteArray())
)
.transformValues(MyDeserializerTransformer::new);
...
KTable<Long, JoinResult> joinResultTable = myTable.join(rightTable, MySpecificClass::getJoinKey, myValueJoiner);
joinResultTable.toStream()...
public class MyDeserializerTransformer implements
ValueTransformerWithKey<Long, byte[], MySpecificClass> {
MyAvroDeserializer deserializer;
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
deserializer = new MyAvroDeserializer();
this.context = context;
}
@Override
public MySpecificClass transform(Long key, byte[] value) {
return deserializer.deserialize(context.topic(), context.headers(), value);
}
@Override
public void close() {
}
}
当我运行它时,我收到一个 ClassCastException。如何解决此问题或找到解决方法?我需要使用二级国有商店吗?
"class": "org.apache.kafka.streams.errors.StreamsException",
"msg": "ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: java.lang.Long, and value: org.apache.kafka.streams.kstream.internals.Change.\nNote that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.",
"stack": [
"org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185)",
"org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)",
"org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)",
"org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)",
"org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)",
"org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:117)",
"org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:87)",
"org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)",
"org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)",
"org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)",
"org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)",
...
"cause": {
"class": "java.lang.ClassCastException",
"msg": "class com.package.MySpecificClass cannot be cast to class [B (com.package.MySpecificClass is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')",
"stack": [
"org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)",
"org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:102)",
"org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:72)",
"org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)",
"org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)",
"org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)",
"org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)",
解决方案
我能够通过首先将输入主题读取为 KStream 并将其转换为具有不同 Serde 的 KTable 作为第二步来解决此问题,似乎状态存储存在不使用标题调用序列化器/反序列化器方法签名的问题。
推荐阅读
- jekyll - 无法从 Jekyll 中的 sass 文件导入变量
- python - 为自定义 CGAL 函数构建 Python 接口
- python - 这个 try-except 子句如何识别整数?
- axapta - 如何启用安全日志记录 AX2012(SecurityTasks 到 SecurityRoles)
- javascript - 如果浏览器挂在 IE11 中,任何用于解冻浏览器的 Javascript 代码
- laravel - 部署到 AWS Elastic Beanstalk 的 Laravel 应用程序显示找不到文件错误
- mongodb - 如何在 Kubernetes 中使用外部 MongoDB 服务
- python - 将 4D 数组转换为 2D 数组并再次返回 4D
- c++ - Eclipse CDT 在 Windows 上无法与 cmake 一起正常工作(给出 NullPointerException)
- react-native - 如何将 createMaterialTopTabNavigator 放置在抽屉导航器和页面标题的下方