apache-flink - 使用自定义源控制 CoProcessFunction 中已处理元素的顺序
问题描述
出于测试目的,我使用以下自定义源:
class ThrottledSource[T](
data: Array[T],
throttling: Int,
beginWaitingTime: Int = 0,
endWaitingTime: Int = 0
) extends SourceFunction[T] {
private var isRunning = true
private var offset = 0
override def run(ctx: SourceFunction.SourceContext[T]): Unit = {
Thread.sleep(beginWaitingTime)
val lock = ctx.getCheckpointLock
while (isRunning && offset < data.length) {
lock.synchronized {
ctx.collect(data(offset))
offset += 1
}
Thread.sleep(throttling)
}
Thread.sleep(endWaitingTime)
}
override def cancel(): Unit = isRunning = false
并在我的测试中像这样使用它
val controlStream = new ThrottledSource[Control](
data = Array(c1,c2), endWaitingTime = 10000, throttling = 0,
)
val dataStream = new ThrottledSource[Event](
data = Array(e1,e2,e3,e4,e5),
throttling = 1000,
beginWaitingTime = 2000,
endWaitingTime = 2000,
)
val dataStream = env.addSource(events)
env.addSource(controlStream)
.connect(dataStream)
.process(MyProcessFunction)
我的意图是首先获取所有控制元素(这就是我不指定 anybeginWaitingTime
或 any的原因throttling
)。在MyProcessFunction 内部processElement1
和processElement2
内部,我在收到元素时打印它们。大多数情况下,我按预期首先获得两个控制元素,但有时令我惊讶的是,我首先获得数据元素,尽管数据源开始发出其元素时使用了两秒的延迟。谁能给我解释一下?
解决方案
控制和数据流源操作符在不同的线程中运行,正如您所见,不能保证运行控制流的源实例在运行数据流的实例之前有机会运行。
您可以在此处查看答案及其在 github 上的相关代码,以找到一种可靠地完成此任务的方法。
推荐阅读
- javascript - 如何将身份验证令牌作为标头从 html 文件传递
- javascript - ReactJs,带有按钮的下拉菜单还是我应该使用其他东西?
- oop - 组合与继承
- c++ - C++ R-Value 有大小吗?
- excel - 这些电话号码是什么类型的
- amazon-web-services - How to setup EMR cluster which supports Impala?
- ios - 分享你的ios开发证书安全吗?
- javascript - 在 php 中使用 Ajax 以 Json 数据格式在控制台上显示数据库中的数据记录时出错
- python - python中的溢出错误?
- angular - 如何从 slickgrid 中的自定义组件获取输出?