spring-kafka - 不同的消费者有各自的过滤策略
问题描述
在 Spring Boot 应用程序中,我们可以有 2 个具有各自过滤策略的不同消费者吗?如果是这样,我们如何将过滤策略挂钩到相应的消费者?(boot-2.3.8) 注意:这两个属性是不同的。
@KafkaListener(topics = "topic1", groupId = "group-id1",
properties = {
"max.poll.interval.ms: ${max.poll.interval.ms}",
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + ":localhost:9092",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + ":false",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG+":org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG+":org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG+":${auto.offset.reset}"
}, autoStartup = "true")
public void onMessage(
@Payload ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
processMessageAndAcknowledge(consumerRecord, acknowledgment);
}
@Bean
public RecordFilterStrategy<Object, Object> recordFilterStrategyForTopic1() {
return rec -> true;
}
@KafkaListener(topics = "topic2", groupId = "group-id2",
properties = {
"max.poll.interval.ms: ${max.poll.interval.ms}",
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + ":localhost:9092",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + ":false",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG+":org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG+":org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG+":${auto.offset.reset}"
}, autoStartup = "true")
public void onMessage(
@Payload ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
processMessageAndAcknowledge(consumerRecord, acknowledgment);
}
@Bean
public RecordFilterStrategy<Object, Object> recordFilterStrategyForTopic2() {
return rec -> true;
}
解决方案
目前无法在侦听器级别覆盖过滤策略;您将不得不创建两个容器工厂。随意在 GitHub 问题中打开新功能请求。https://github.com/spring-projects/spring-kafka/issues
一种快速的解决方案是编写一个包装器策略,根据ConsumerRecord
.
return rec -> rec.topic().equals("topic1")
? recordFilterStrategyForTopic1.filter(rec)
: recordFilterStrategyForTopic2.filter(rec);
编辑
这是一个完整的例子:
@SpringBootApplication
public class So67391819Application {
public static void main(String[] args) {
SpringApplication.run(So67391819Application.class, args);
}
@KafkaListener(id = "so67391819-1", topics = "so67391819-1")
void listen1(String in, @Header(KafkaHeaders.OFFSET) long offset) {
System.out.println("listener1 " + in + "@" + offset);
}
@KafkaListener(id = "so67391819-2", topics = "so67391819-2")
void listen2(String in, @Header(KafkaHeaders.OFFSET) long offset) {
System.out.println("listener2 " + in + "@" + offset);
}
@Bean
NewTopic topic1() {
return TopicBuilder.name("so67391819-1").partitions(1).replicas(1).build();
}
@Bean
NewTopic topic2() {
return TopicBuilder.name("so67391819-2").partitions(1).replicas(1).build();
}
@Bean
RecordFilterStrategy<String, String> f1() {
return rec -> rec.offset() % 2 == 0;
}
@Bean
RecordFilterStrategy<String, String> f2() {
return rec -> rec.offset() % 2 == 1;
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> {
template.send("so67391819-1", "foo" + i);
template.send("so67391819-2", "bar" + i);
});
};
}
}
@Component
class FactoryCustomizer {
FactoryCustomizer(ConcurrentKafkaListenerContainerFactory<String, String> factory,
RecordFilterStrategy<String, String> f1,
RecordFilterStrategy<String, String> f2) {
factory.setRecordFilterStrategy(rec -> rec.topic().endsWith("-1") ? f1.filter(rec) : f2.filter(rec));
}
}
listener1 foo1@1
listener2 bar0@0
listener1 foo3@3
listener1 foo5@5
listener2 bar2@2
listener1 foo7@7
listener2 bar4@4
listener1 foo9@9
listener2 bar6@6
listener2 bar8@8
推荐阅读
- amazon-elastic-beanstalk - 使用 bitbucket CI/CD 在 Elastic beanstalk 环境中发布部署命令
- c - 为什么 scanf("%[^\n]\n", titre); 问我第三个输入
- codeblocks - 如何将代码标签放在 quill pre 标签内
- c# - C# JsonSerializer.Serialize 在生成的json中添加键值
- spring - 有时基于来自数据库的配置验证制造商属性的 bean 约束
- twilio - 打开相机计数 twilio 参与者
- javascript - 意外的令牌'const' | 不和谐
- google-chrome - 在 TLS supported_versions 扩展中发送的这个协议版本是什么
- javascript - 如何在 Javascript 中为不和谐机器人制作随机生成器?
- ios - UITableViewDiffableDataSouce:无效更新:无效的节数