首页 > 解决方案 > 如何使用 Flink MiniCluster 触发 ProcessTimeTimer

问题描述

我有一个KeyedCoProcessFunction在更大的 Flink 流作业中注册处理时间计时器的 Flink,我正在尝试使用 Flink MiniCluster为整个作业创建单元测试。但我无法在触发器onTimer()中恢复呼叫。KeyedCoProcessFunction

有没有人让这个工作?它需要任何特殊配置吗?

切换到事件时间工作正常,所以我想知道这是否不适用于 Flink MiniCluster 或者我的实现是否有问题。

我在 Scala 中编写了一个简单的测试,看看我是否可以让它工作。

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.test.streaming.runtime.util.TestListResultSink
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.apache.flink.util.Collector
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.slf4j.LoggerFactory

class TimerTest extends AnyFlatSpec with BeforeAndAfter {

  private val SlotsPerTaskMgr = 1
  val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
    .setNumberSlotsPerTaskManager(SlotsPerTaskMgr)
    .setNumberTaskManagers(1)
    .build)

  before {
    flinkCluster.before()
  }

  after {
    flinkCluster.after()
  }

  "MiniCluster" should "trigger onTimer" in {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

    implicit val longTypeInfo: TypeInformation[Long] = TypeInformation.of(classOf[Long])

    val sink = new TestListResultSink[Long]

    env.addSource(new MyLongSource(100))
      .keyBy(v => v)
      .process(new MyProccesor())
      .addSink(sink)

    env.execute()

    println("Received " + sink.getResult.size() + " output records.")
  }

}

class MyProccesor extends KeyedProcessFunction[Long, Long, Long] {

  private val log = LoggerFactory.getLogger(this.getClass)

  override def processElement(
                               value: Long,
                               ctx: KeyedProcessFunction[Long, Long, Long]#Context,
                               out: Collector[Long]): Unit = {
    log.info("Received {} at {}", value, ctx.timerService().currentProcessingTime())
    if (value % 10 == 0) {
      log.info("Scheduling processing timer for {}", ctx.timerService().currentProcessingTime() + 10)
      ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 10)
    }
  }

  override def onTimer(
                        timestamp: Long,
                        ctx: KeyedProcessFunction[Long, Long, Long]#OnTimerContext,
                        out: Collector[Long]): Unit = {
    log.info("Received onTimer at {}", timestamp)
    out.collect(timestamp)
  }
}

class MyLongSource(n:Int) extends ParallelSourceFunction[Long] {
  @volatile private var stop = false

  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
    for(i <- 1 to n) {
      if(stop) return;
      println("Sending " + i)
      ctx.collect(i)
    }

    Thread.sleep(1000)
  }

  override def cancel(): Unit = {
    stop = true
  }
}

通过在源方法Thread.sleep(1000)的末尾添加 a ,我终于能够获得一些一致的结果。run()似乎一旦源退出,消息就会得到处理,然后即使有挂起的计时器,一切也会被关闭。

标签: apache-flinkflink-streaming

解决方案


当 Flink 作业关闭时,任何待处理的处理时间计时器都会被忽略。他们从不开火。

值得一提的是,在 Flink 开发邮件列表上正在进行一些关于提供触发所有待处理处理时间计时器的选项的讨论。请参阅http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-FLIP-134-DataStream-Semantics-for-Bounded-Input-td37365.html#a37558


推荐阅读