首页 > 解决方案 > 在 run() 中调用另一个 SourceFunction 的 run()?

问题描述

为了解决FLINK-2491我正在尝试链接SourceFunctions 以便在第一个退出时它不会受到伤害。基本思路如下:

class WrappingSourceFunction(innerSourceFunction: SourceFunction[Inner]) extends SourceFunction[Outer] {
  override def run(outerCtx: SourceContext[Outer]): Unit = {
    outerCtx.collect(...)


    val innerCtx: SourceContext[Inner] = new SourceContextWrapper(outerCtx)
    innerSourceFunction.run(innerCtx)
  }

  override def cancel() = innerSourceFunction.cancel()
}

可以调用run()不同的SourceFunction内部run()并实现我自己的SourceContext委托给另一个内部吗?它适用于在本地 Flink 环境中运行的小型测试,但我想知道在生产环境中这样做是否会出现任何问题。

标签: apache-flinkflink-streaming

解决方案


推荐阅读