首页 > 解决方案 > 使用自定义源控制 CoProcessFunction 中已处理元素的顺序

问题描述

出于测试目的,我使用以下自定义源:

class ThrottledSource[T](
  data: Array[T],
  throttling: Int,
  beginWaitingTime: Int = 0,
  endWaitingTime: Int = 0
) extends SourceFunction[T] {

  private var isRunning = true
  private var offset = 0

  override def run(ctx: SourceFunction.SourceContext[T]): Unit = {
    Thread.sleep(beginWaitingTime)

    val lock = ctx.getCheckpointLock

    while (isRunning && offset < data.length) {
      lock.synchronized {
        ctx.collect(data(offset))
        offset += 1
      }
      Thread.sleep(throttling)
    }

    Thread.sleep(endWaitingTime)
  }

  override def cancel(): Unit = isRunning = false

并在我的测试中像这样使用它

val controlStream = new ThrottledSource[Control](
  data = Array(c1,c2), endWaitingTime = 10000, throttling = 0,
)

val dataStream = new ThrottledSource[Event](
  data = Array(e1,e2,e3,e4,e5),
  throttling = 1000,
  beginWaitingTime = 2000,
  endWaitingTime = 2000,
)

val dataStream = env.addSource(events)

env.addSource(controlStream)
  .connect(dataStream)
  .process(MyProcessFunction)

我的意图是首先获取所有控制元素(这就是我不指定 anybeginWaitingTime或 any的原因throttling)。在MyProcessFunction 内部processElement1processElement2内部,我在收到元素时打印它们。大多数情况下,我按预期首先获得两个控制元素,但有时令我惊讶的是,我首先获得数据元素,尽管数据源开始发出其元素时使用了两秒的延迟。谁能给我解释一下?

标签: apache-flink

解决方案


控制和数据流源操作符在不同的线程中运行,正如您所见,不能保证运行控制流的源实例在运行数据流的实例之前有机会运行。

您可以在此处查看答案及其在 github 上的相关代码,以找到一种可靠地完成此任务的方法。


推荐阅读