首页 > 解决方案 > flink 的 Kafka 水印策略在我的应用程序中不起作用

问题描述

我使用 flink 1.13.0 版本

当我尝试通过 flink doc 使用似乎不起作用的 Kafka Watermark Strategies 时,窗口进程功能将不会运行。

而且我想通过这种方式知道,水印的时间戳将使用kafka中的消耗时间或生产时间?

我的消费者的代码是这样的:

val source = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties)
  .setCommitOffsetsOnCheckpoints(true)
  .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(5)))
stream = env.addSource(source)
  .name(nodeCfg.nodeName)
  .uid(nodeCfg.nodeName)
  .setParallelism(nodeCfg.workerCount)

并像这样使用窗口:

processStream
  .keyBy(_.num)
  .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  .aggregate(new LatTimeAggregate(), new SignLatCalculateProcess())
  .name(nodeCfg.nodeName)
  .uid(nodeCfg.nodeName)
  .setParallelism(nodeCfg.workerCount)
  .addSink(new SignLatSink(serverConfig.smsRuleRedis))
  .name("lat_count_sink")
  .uid("lat_count_sink")

和这样的拓扑图:

拓扑图

标签: apache-kafkaapache-flinkflink-streaming

解决方案


由于您没有在水印策略中指定时间戳分配器,因此您依赖 FlinkKafkaConsumer 为流记录分配时间戳。这仅在从 Kafka 读取的记录在其标头中有时间戳时才有效。否则,您需要实现时间戳分配器以从事件中提取时间戳。

请注意,您将无法实现 FlinkKafkaConsumer 可以应用的时间戳分配器,除非您还实现了反序列化器,FlinkKafkaConsumer 可以使用该反序列化器来生成带有时间戳的对象,然后可以提取该对象。否则,您可以选择在源之后的某处应用水印策略。

如果缺少时间戳不是问题,那么可能还有其他问题。例如,您可能有一个空闲的 Kafka 分区,或者缺少足够远的事件来关闭窗口。

顺便说一句,如果您的事件是按分区排列的,并且如果您在 FlinkKafkaConsumer 上调用 assignTimestampsAndWatermarks(您目前正在这样做),那么您可以使用forMonotonousTimestamps而不是forBoundedOutOfOrderness,它具有一些显着的优势。


推荐阅读