首页 > 解决方案 > 从 FlinkKafkaConsumer 迁移到 KafkaSource,没有执行任何窗口

问题描述

我是一个kafka和flink初学者。我已经实现FlinkKafkaConsumer了使用来自 kafka-topic 的消息。除了“组”和“主题”之外,唯一的自定义设置是(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")启用多次重新阅读相同的消息。它开箱即用,可用于消费和逻辑。现在FlinkKafkaConsumer已弃用,我想更改为继任者KafkaSource

KafkaSource使用与我相同的参数进行初始化会FlinkKafkaConsumer产生预期的主题读取,我可以通过打印流来验证这一点。反序列化和时间戳似乎工作正常。然而,窗口的执行并未完成,因此不会产生任何结果。

我假设某些默认设置与 中KafkaSource的不同FlinkKafkaConsumer,但我不知道它们可能是什么。

KafkaSource -不工作

 KafkaSource<TestData> source = KafkaSource.<TestData>builder()
                .setBootstrapServers(propertiesForKafka.getProperty("bootstrap.servers"))
                .setTopics(TOPIC)
                .setDeserializer(new CustomDeserializer())
                .setGroupId(GROUP_ID)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .build();

        DataStream<TestData> testDataStreamSource = env.fromSource(
                source,
                WatermarkStrategy.
                        <TestData>noWatermarks(),
                "Kafka Source"
        );

卡夫卡消费者 -工作(属性包含group.idbootstrap.serverszookeeper.connect

    propertiesForKafka.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    FlinkKafkaConsumer<TestData> flinkKafkaConsumer = new  FlinkKafkaConsumer(TOPIC, new CustomDeserializer(), propertiesForKafka);
    DataStreamSource<TestData> testDataStreamSource = env.addSource(flinkKafkaConsumer)

两个流都使用相同的管道,如下所示

testDataStreamSource
           .assignTimestampsAndWatermarks(WatermarkStrategy.
                        <TestData>forMonotonousTimestamps().
                        withTimestampAssigner((event, timestamp) -> event.getTimestamp()))
            .keyBy(TestData::getKey)
            .window(SlidingEventTimeWindows.of(Time.hours(3), Time.hours(1)))
            .process(new ProcessWindowFunction<TestData, TestDataOutput, String, TimeWindow>() {
                    @Override
                    public void process(
            ....
            });

尝试过的事情

标签: apache-kafkaapache-flinkflink-streaming

解决方案


更新:答案是 KafkaSource 的行为不同于 FlinkKafkaConsumer 在 Kafka 分区数量小于 Flink 的 kafka source operator 的并行度的情况下。有关详细信息,请参阅https://stackoverflow.com/a/70101290/2000823

原答案:

几乎可以肯定,这个问题与时间戳和水印有关。

要验证时间戳和水印是否存在问题,您可以做一个快速实验,将 3 小时长的事件时间滑动窗口替换为处理时间较短的滚动窗口。

一般来说,最好(但不是必须)让 KafkaSource 进行水印处理。forMonotonousTimestamps正如您现在所做的那样,在源之后应用的水印生成器中使用是一个冒险的举动。只有当源的每个并行实例使用的所有分区中的时间戳按顺序处理时,这才能正常工作。如果将多个 Kafka 分区分配给任何 KafkaSource 任务,则不会发生这种情况。另一方面,如果您forMonotonousTimestamps在 fromSource 调用(而不是noWatermarks)中提供水印策略,那么所需要的只是时间戳按分区顺序排列,我想就是这种情况。

令人不安的是,它可能还不足以解释为什么窗口不产生任何结果。另一个可能的根本原因是测试数据集在第一个窗口之后不包含任何带有时间戳的事件,因此该窗口永远不会关闭。

你有水槽吗?如果没有,那就可以解释了。

您可以使用 Flink 仪表板来帮助调试它。查看水印是否在窗口任务中推进。打开检查点,然后查看窗口任务有多少状态——它应该有一些非零状态。


推荐阅读