scala - 满足特定条件后如何停止 Flink Job
问题描述
我有一个带有 1 个输入源的 flink 应用程序,它通过几个映射并返回到 kafka 源。
基于某些条件,在单个地图中。我想以安全的方式停止工作中的地图和所有地图。并安全地停止工作。
如何从 flink 应用程序中停止 flink 作业?
我尝试添加一个硬故障。就像访问会使应用程序崩溃的空指针一样。
然而,有时只有那张地图会失败,而所有其他地图要么被取消(预期),要么有些只是挂起(取消)
val in_stream = env.addSource(myConsumer).name(topic_in).setParallelism(1)
.map(x => doSomethingA(x)
.map(x => doSomethingB(x)
.map(x => doSomethingC(x) //In this map i want to safely stop the job based on some condition
.map(x => doSomethingD(x)
.addSink(new FlinkKafkaProducer09[String](broker, topic_out, new SimpleStringSchema()))
我希望安全地退出工作(所有地图都已取消)我该怎么做?
解决方案
推荐阅读
- gradle - 当我只删除一个单词时,为什么 Body 构建中的代码会失败?
- python - 无法为具有值 ['type'] 的端点 'topic' 构建 url。你的意思是“topic_func”吗?
- ruby-on-rails - 未预编译资产时 Rails 5.2 服务器挂起
- python - mypy:在装饰器中更新的类签名
- django - Django:在 ListView 中定义自定义字段
- certificate - 如何更改由证书颁发机构请求证书创建的证书的有效期以上传到开发者帐户
- python-3.x - Python 3.x | 如何删除列表中的括号和撇号以获得更清晰的输出
- python - Python ImportError 没有名为 PIL 的模块
- c++ - 定义 Matx 的编译错误
OpenCV中的变量 - python - 修补类实例属性,该属性是另一个类的属性