apache-kafka - Kafka如何处理同一主题中不同类型的记录,包括生产和消费
问题描述
我们的用例将在同一主题内的 kafka 中生成两种不同类型(两个类)的记录。
我们有两个生产者,在每个配置值.serializer中,我们实现一个序列化器,专门针对这个记录类,比如Class1Serializer,Class2Serializer
我们使用两个 KafkaStream 来消费同一个主题。但是在不同的组中,所以这两个流在同一个分区上工作。此外,我们为每个流分配了两个不同的 Serializer 和 Deserializer。我们想要的是让stream1只处理Class1,让stream2只处理Class2。
我们发现的现象是,当我们在生产者上使用生产记录时,这两个流都会消耗它。但是一个流会在反序列化时报告错误。
我的问题是,是否可以设置两个 KafkaStreams 以在同一主题上使用不同类型的记录?
我们使用的生产者是 org.apache.kafka:kafka-client
我们使用的流是 org.apache.kafka:kafka-stream-scala
{
// here define the serialize and deserializer for class1
implicit val serde1: Class1Serde = new Class1Serde
val builder: StreamsBuilder = new StreamsBuilder
val class1KStream: KStream[String, Class1] = builder.stream[String, Class1](topic)
class1KStream.inner.foreach(do something)
val topology1 :Topology = builder.build()
val stream1 = new KafkaStreams(topology1, class1Properties)
}
{
// here define the serialize and deserializer for class2
implicit val serde2: Class2Serde = new Class2Serde
val builder: StreamsBuilder = new StreamsBuilder
val class2KStream: KStream[String, Class1] = builder.stream[String, Class2](topic)
class2KStream.inner.foreach(do something)
val topology2 :Topology = builder.build()
val stream2 = new KafkaStreams(topology2, class1Properties)
}
解决方案
推荐阅读
- elasticsearch - 弹性搜索上的滚动时间增量效果
- javascript - 在 Selenium 中使用 Xpath 改变 innerHTML;Python 创建多个导致 SyntaxError: Invalid or unexpected token 的转义
- php - 致命错误:在布尔值上调用成员函数 bind_param()。PHP 错误
- python - Selenium 很好地检测到弹出对话框,但无法处理它
- python - 计算python列表中对象的出现次数
- reactjs - 图像 URL 同时采用 localhost 和服务器主机
- python - Django:会话在 Heroku 上没有按预期工作
- android - 程序类型已经存在:com.opencsv.CSVParser
- python - 为什么 tf.get_variable('test') 返回一个名为 test_1 的变量?
- python - 在“if”条件下使用变量时的 Python 3 NameError