apache-kafka - 从 FlinkKafkaConsumer 迁移到 KafkaSource,没有执行任何窗口
问题描述
我是一个kafka和flink初学者。我已经实现FlinkKafkaConsumer
了使用来自 kafka-topic 的消息。除了“组”和“主题”之外,唯一的自定义设置是(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
启用多次重新阅读相同的消息。它开箱即用,可用于消费和逻辑。现在FlinkKafkaConsumer
已弃用,我想更改为继任者KafkaSource
。
KafkaSource
使用与我相同的参数进行初始化会FlinkKafkaConsumer
产生预期的主题读取,我可以通过打印流来验证这一点。反序列化和时间戳似乎工作正常。然而,窗口的执行并未完成,因此不会产生任何结果。
我假设某些默认设置与 中KafkaSource
的不同FlinkKafkaConsumer
,但我不知道它们可能是什么。
KafkaSource -不工作
KafkaSource<TestData> source = KafkaSource.<TestData>builder()
.setBootstrapServers(propertiesForKafka.getProperty("bootstrap.servers"))
.setTopics(TOPIC)
.setDeserializer(new CustomDeserializer())
.setGroupId(GROUP_ID)
.setStartingOffsets(OffsetsInitializer.earliest())
.build();
DataStream<TestData> testDataStreamSource = env.fromSource(
source,
WatermarkStrategy.
<TestData>noWatermarks(),
"Kafka Source"
);
卡夫卡消费者 -工作(属性包含group.id
,bootstrap.servers
和zookeeper.connect
)
propertiesForKafka.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
FlinkKafkaConsumer<TestData> flinkKafkaConsumer = new FlinkKafkaConsumer(TOPIC, new CustomDeserializer(), propertiesForKafka);
DataStreamSource<TestData> testDataStreamSource = env.addSource(flinkKafkaConsumer)
两个流都使用相同的管道,如下所示
testDataStreamSource
.assignTimestampsAndWatermarks(WatermarkStrategy.
<TestData>forMonotonousTimestamps().
withTimestampAssigner((event, timestamp) -> event.getTimestamp()))
.keyBy(TestData::getKey)
.window(SlidingEventTimeWindows.of(Time.hours(3), Time.hours(1)))
.process(new ProcessWindowFunction<TestData, TestDataOutput, String, TimeWindow>() {
@Override
public void process(
....
});
尝试过的事情
- 我尝试过设置偏移量的提交,但并没有改善这种情况。
- 已在源中设置时间戳。
解决方案
更新:答案是 KafkaSource 的行为不同于 FlinkKafkaConsumer 在 Kafka 分区数量小于 Flink 的 kafka source operator 的并行度的情况下。有关详细信息,请参阅https://stackoverflow.com/a/70101290/2000823。
原答案:
几乎可以肯定,这个问题与时间戳和水印有关。
要验证时间戳和水印是否存在问题,您可以做一个快速实验,将 3 小时长的事件时间滑动窗口替换为处理时间较短的滚动窗口。
一般来说,最好(但不是必须)让 KafkaSource 进行水印处理。forMonotonousTimestamps
正如您现在所做的那样,在源之后应用的水印生成器中使用是一个冒险的举动。只有当源的每个并行实例使用的所有分区中的时间戳按顺序处理时,这才能正常工作。如果将多个 Kafka 分区分配给任何 KafkaSource 任务,则不会发生这种情况。另一方面,如果您forMonotonousTimestamps
在 fromSource 调用(而不是noWatermarks
)中提供水印策略,那么所需要的只是时间戳按分区顺序排列,我想就是这种情况。
令人不安的是,它可能还不足以解释为什么窗口不产生任何结果。另一个可能的根本原因是测试数据集在第一个窗口之后不包含任何带有时间戳的事件,因此该窗口永远不会关闭。
你有水槽吗?如果没有,那就可以解释了。
您可以使用 Flink 仪表板来帮助调试它。查看水印是否在窗口任务中推进。打开检查点,然后查看窗口任务有多少状态——它应该有一些非零状态。
推荐阅读
- c# - 模拟用于 Azure 表存储的 CloudStorageAccount 和 CloudTable
- ruby-on-rails - 如何将 Rails 中的 send_data 方法用于整个索引视图,而不仅仅是第一页?
- java - Return all attributes in junit result xml using xpath and Java DOM Parser
- javascript - 在 Google 地图自动填充功能中排除某些位置
- c# - asp.net core docker container using Oracle Managed Driver Core. throws ORA-00604 and ORA-01882 when opening connection
- gimp - What does the 'gimp_histogram' procedure require to work?
- python - 添加表示熊猫数据框中每个组的中位数的列
- java - 远程使用 document4j 将 DOCX 转换为 PDF
- kotlin - 带有 Kafka 的 Axon 4.0。事件未处理第二次服务
- r - 使用ggplotGrob时如何避免grob名称中的随机后缀?