apache-flink - 在 run() 中调用另一个 SourceFunction 的 run()?
问题描述
为了解决FLINK-2491我正在尝试链接SourceFunction
s 以便在第一个退出时它不会受到伤害。基本思路如下:
class WrappingSourceFunction(innerSourceFunction: SourceFunction[Inner]) extends SourceFunction[Outer] {
override def run(outerCtx: SourceContext[Outer]): Unit = {
outerCtx.collect(...)
val innerCtx: SourceContext[Inner] = new SourceContextWrapper(outerCtx)
innerSourceFunction.run(innerCtx)
}
override def cancel() = innerSourceFunction.cancel()
}
可以调用run()
不同的SourceFunction
内部run()
并实现我自己的SourceContext
委托给另一个内部吗?它适用于在本地 Flink 环境中运行的小型测试,但我想知道在生产环境中这样做是否会出现任何问题。
解决方案
推荐阅读
- python - 从 django 可重用应用程序扩展 settings.py 中的 INSTALLED_APPS
- typescript - 添加监听器时为什么会出错?
- python - 在 pypi.org 上查找 Python 包导入某个包
- xampp - 如何解决xampp中找不到入口点错误?
- asp.net-core-mvc - 在 IIS 8.5 上发布 .NET Core 3.1 React 应用程序会导致 API 路由不被遵守
- angular - Angular 9 - 如何在不刷新页面的情况下调用 RestAPI?
- google-sheets - 添加新工作表时间接显示错误
- android - 从 InteliJ IDEA JDK 类路径中删除 JARS
- java - 试图在java中创建一个菜单来计算和显示一个圆的信息
- ios - 在 SwiftUI 中根据明暗模式更改 buttonStyle 修饰符