apache-flink - 如何使用 Flink MiniCluster 触发 ProcessTimeTimer
问题描述
我有一个KeyedCoProcessFunction
在更大的 Flink 流作业中注册处理时间计时器的 Flink,我正在尝试使用 Flink MiniCluster为整个作业创建单元测试。但我无法在触发器onTimer()
中恢复呼叫。KeyedCoProcessFunction
有没有人让这个工作?它需要任何特殊配置吗?
切换到事件时间工作正常,所以我想知道这是否不适用于 Flink MiniCluster 或者我的实现是否有问题。
我在 Scala 中编写了一个简单的测试,看看我是否可以让它工作。
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.test.streaming.runtime.util.TestListResultSink
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.apache.flink.util.Collector
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.slf4j.LoggerFactory
class TimerTest extends AnyFlatSpec with BeforeAndAfter {
private val SlotsPerTaskMgr = 1
val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(SlotsPerTaskMgr)
.setNumberTaskManagers(1)
.build)
before {
flinkCluster.before()
}
after {
flinkCluster.after()
}
"MiniCluster" should "trigger onTimer" in {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
implicit val longTypeInfo: TypeInformation[Long] = TypeInformation.of(classOf[Long])
val sink = new TestListResultSink[Long]
env.addSource(new MyLongSource(100))
.keyBy(v => v)
.process(new MyProccesor())
.addSink(sink)
env.execute()
println("Received " + sink.getResult.size() + " output records.")
}
}
class MyProccesor extends KeyedProcessFunction[Long, Long, Long] {
private val log = LoggerFactory.getLogger(this.getClass)
override def processElement(
value: Long,
ctx: KeyedProcessFunction[Long, Long, Long]#Context,
out: Collector[Long]): Unit = {
log.info("Received {} at {}", value, ctx.timerService().currentProcessingTime())
if (value % 10 == 0) {
log.info("Scheduling processing timer for {}", ctx.timerService().currentProcessingTime() + 10)
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 10)
}
}
override def onTimer(
timestamp: Long,
ctx: KeyedProcessFunction[Long, Long, Long]#OnTimerContext,
out: Collector[Long]): Unit = {
log.info("Received onTimer at {}", timestamp)
out.collect(timestamp)
}
}
class MyLongSource(n:Int) extends ParallelSourceFunction[Long] {
@volatile private var stop = false
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
for(i <- 1 to n) {
if(stop) return;
println("Sending " + i)
ctx.collect(i)
}
Thread.sleep(1000)
}
override def cancel(): Unit = {
stop = true
}
}
通过在源方法Thread.sleep(1000)
的末尾添加 a ,我终于能够获得一些一致的结果。run()
似乎一旦源退出,消息就会得到处理,然后即使有挂起的计时器,一切也会被关闭。
解决方案
当 Flink 作业关闭时,任何待处理的处理时间计时器都会被忽略。他们从不开火。
值得一提的是,在 Flink 开发邮件列表上正在进行一些关于提供触发所有待处理处理时间计时器的选项的讨论。请参阅http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-FLIP-134-DataStream-Semantics-for-Bounded-Input-td37365.html#a37558。
推荐阅读
- json-rpc - 有没有记录 json rpc api 的好工具?
- html - 如何在 Ipad 中更改方向时隐藏自动 HTML 选择元素
- reactjs - React Hook useEffect 缺少依赖项:props
- java - 如何计算导入到数组 CSV 文件中的行数?
- laravel - 检查不在数组中的数据与 2 个值匹配
- sql - T-SQL 条件“WHERE”子句和“IN”子句包含结果集中的所有键
- c - 在 C 中取消引用指向结构的指针时会发生什么?
- javascript - 如何在多次单击时添加输入字段
- html - iOS 14 上奇怪的 HTML Canvas 渲染问题 - 如在 iPhone 8 上所见
- mysql - 为什么以下查询会返回错误消息?(有人可以详细解释一下)