首页 > 解决方案 > 如何在架构级别监听 kafka 事件

问题描述

我是 Kafka 的新手,我正在尝试监听具有模式级别的事件。我有来自同一主题的多个事件,但模式名称不同。我只想听特定的模式事件。

Kafka Producer 事件示例:
示例 1:

{"id":"237","metadata":{"timestamp":1464275609527,**"schema”:”customer_add”**,”schemaVersion":1,"type":"EVENT","routingKey":{"type":"simple","value":"null"},"lookupKey":{"type":"simple","value":"null"},"tenant":"java-consumerapis","stream”:” DEAL”,”sender":""},"data":{"user_id":"1212315","name":"abhinav agraawal","mobile":"7624444444”,”email":"user1@gmail.com”}}

示例 2:

{"id":"237","metadata":{"timestamp":1564275609527,**"schema”:”customer_call”**,”schemaVersion":1,"type":"EVENT","routingKey":{"type":"simple","value":"null"},"lookupKey":{"type":"simple","value":"null"},"tenant":"java-consumerapis","stream”:” DEAL”,”sender":""},"data":{"user_id":"1212331","name”:"Divya agraawal","mobile":"7624453553”,”email":"user13@gmail.com”}}

这里两个事件都是从DEAL主题产生的,但两个事件的模式都是事件,所以我只想听customer_call schema事件。

@KafkaListener(topics = "DEAL", groupId = "group_id")
public void consume(String message) {
    System.out.println(message);

    logger.info(String.format("$$ -> Consumed Message -> %s", message));


    String bootstarpserver="127.0.0.1:9092";
    String groupId="user011";
    String topic="DEAL";


    // create topic
    Properties properties=new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstarpserver);
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");


    // create consumer

    KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<>(properties);

    // subscribe consumer to our topic
    kafkaConsumer.subscribe(Collections.singleton(topic));


    while (true){
        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String,String> record : records){
            logger.info("key :: "+ record.key() + " value is :: " +record.value());
            logger.info(("partition ::  "+ record.partition()+ "offset::  "+ record.offset()));
        }


    }

在这里,我正在监听所有事件,但我可以在 Kafka 消费者级别过滤我的事件。

标签: javaspring-bootapache-kafkakafka-consumer-api

解决方案


如果您使用流式 API,您可以轻松地做到这一点,如果您在消费者 API 中需要它,您可以使用 java 8 来做到这一点,

KStream<String,String> inputStream = builder.stream("topicname");
KStream<String,String> filtered = inputStream.filter((k,v)->v.
                  contains("\"schema\":\"customer_call\""));

推荐阅读