首页 > 解决方案 > Apache Flink Kafka 消费者问题

问题描述

我在 Kafka 中有数据,我想读取 Kafka 是否发送数据的数据,然后过滤它们并返回 JSON。

        // create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();

        properties.setProperty("bootstrap.servers", "localhost:9092");
       
        properties.setProperty("group.id", "flink_consumer");


        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-topic",
                new SimpleStringSchema(), properties);
        consumer.setStartFromLatest();
        //config.setWriteTimestampToKafka(true);

        DataStream<String> stream = env.addSource(consumer);

        stream.map(new MapFunction<String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public String map(String value) throws Exception {
                
                return "Stream Value: " + value;
            }
        }).print();
        env.execute();

案例 1:当 Kafka 生产者将数据发送到 Kafka 时,我可以在控制台中看到值打印。- 这很好。案例 2:Kafka 生产者停止发送数据,Kafka 在主题中仍然有价值,但相同的代码没有返回任何数据。 - 这可能吗?

知道在哪里犯错了吗?

{"firsname":"test", "lastname":"topic", "value":"3.45", "location":"UK"}

我想要过滤firstname并返回 JSON。

我看到在数据流处理过程中有过滤器选项。

标签: apache-kafkaapache-flinkflink-streaming

解决方案


如果你想从第一条消息开始,你应该设置consumer.setStartFromEarliest();. 它将从第一个未确认的消息开始读取。


推荐阅读