java - 如何在架构级别监听 kafka 事件
问题描述
我是 Kafka 的新手,我正在尝试监听具有模式级别的事件。我有来自同一主题的多个事件,但模式名称不同。我只想听特定的模式事件。
Kafka Producer 事件示例:
示例 1:
{"id":"237","metadata":{"timestamp":1464275609527,**"schema”:”customer_add”**,”schemaVersion":1,"type":"EVENT","routingKey":{"type":"simple","value":"null"},"lookupKey":{"type":"simple","value":"null"},"tenant":"java-consumerapis","stream”:” DEAL”,”sender":""},"data":{"user_id":"1212315","name":"abhinav agraawal","mobile":"7624444444”,”email":"user1@gmail.com”}}
示例 2:
{"id":"237","metadata":{"timestamp":1564275609527,**"schema”:”customer_call”**,”schemaVersion":1,"type":"EVENT","routingKey":{"type":"simple","value":"null"},"lookupKey":{"type":"simple","value":"null"},"tenant":"java-consumerapis","stream”:” DEAL”,”sender":""},"data":{"user_id":"1212331","name”:"Divya agraawal","mobile":"7624453553”,”email":"user13@gmail.com”}}
这里两个事件都是从DEAL
主题产生的,但两个事件的模式都是事件,所以我只想听customer_call schema
事件。
@KafkaListener(topics = "DEAL", groupId = "group_id")
public void consume(String message) {
System.out.println(message);
logger.info(String.format("$$ -> Consumed Message -> %s", message));
String bootstarpserver="127.0.0.1:9092";
String groupId="user011";
String topic="DEAL";
// create topic
Properties properties=new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstarpserver);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
// create consumer
KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<>(properties);
// subscribe consumer to our topic
kafkaConsumer.subscribe(Collections.singleton(topic));
while (true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String,String> record : records){
logger.info("key :: "+ record.key() + " value is :: " +record.value());
logger.info(("partition :: "+ record.partition()+ "offset:: "+ record.offset()));
}
}
在这里,我正在监听所有事件,但我可以在 Kafka 消费者级别过滤我的事件。
解决方案
如果您使用流式 API,您可以轻松地做到这一点,如果您在消费者 API 中需要它,您可以使用 java 8 来做到这一点,
KStream<String,String> inputStream = builder.stream("topicname");
KStream<String,String> filtered = inputStream.filter((k,v)->v.
contains("\"schema\":\"customer_call\""));
推荐阅读
- python - 为什么scrapy spider在抓取amazon时没有返回输出?
- python-3.x - 如何根据与其他行的重合来组合 pandas DataFrame 上的行
- html - Blogdown Hugo 网站特色列表显示嵌入式 HTML
- jquery - 动态文本区域不会根据 Jquery 中页面加载时的输入值进行扩展
- selector - Grafana 仪表板选择器卡住
- sql-server - SSRS - 适合 PDF 导出到一页
- python - Python Selenium 中的循环列表
- operating-system - 删除一个并创建新的 MBR 分区排列
- ansible - ansible如何从另一个角色加载变量,而不执行它?
- java - 尝试在 Maven 插件中执行 jar