首页 > 解决方案 > Kafka如何处理同一主题中不同类型的记录,包括生产和消费

问题描述

我们的用例将在同一主题内的 kafka 中生成两种不同类型(两个类)的记录。

我们有两个生产者,在每个配置值.serializer中,我们实现一个序列化器,专门针对这个记录类,比如Class1Serializer,Class2Serializer

我们使用两个 KafkaStream 来消费同一个主题。但是在不同的组中,所以这两个流在同一个分区上工作。此外,我们为每个流分配了两个不同的 Serializer 和 Deserializer。我们想要的是让stream1只处理Class1,让stream2只处理Class2。

我们发现的现象是,当我们在生产者上使用生产记录时,这两个流都会消耗它。但是一个流会在反序列化时报告错误。

我的问题是,是否可以设置两个 KafkaStreams 以在同一主题上使用不同类型的记录?

我们使用的生产者是 org.apache.kafka:kafka-client

我们使用的流是 org.apache.kafka:kafka-stream-scala

{
// here define the serialize and deserializer for class1
implicit val serde1: Class1Serde = new Class1Serde

val builder: StreamsBuilder = new StreamsBuilder
val class1KStream: KStream[String, Class1] = builder.stream[String, Class1](topic)
class1KStream.inner.foreach(do something)
val topology1 :Topology = builder.build()
val stream1 = new KafkaStreams(topology1, class1Properties)
}
{
// here define the serialize and deserializer for class2
implicit val serde2: Class2Serde = new Class2Serde

val builder: StreamsBuilder = new StreamsBuilder
val class2KStream: KStream[String, Class1] = builder.stream[String, Class2](topic)
class2KStream.inner.foreach(do something)
val topology2 :Topology = builder.build()
val stream2 = new KafkaStreams(topology2, class1Properties)
}

标签: apache-kafkaapache-kafka-streamskafka-producer-apikafka-serializer

解决方案


推荐阅读