首页 > 解决方案 > 满足特定条件后如何停止 Flink Job

问题描述

我有一个带有 1 个输入源的 flink 应用程序,它通过几个映射并返回到 kafka 源。

基于某些条件,在单个地图中。我想以安全的方式停止工作中的地图和所有地图。并安全地停止工作。

如何从 flink 应用程序中停止 flink 作业?

我尝试添加一个硬故障。就像访问会使应用程序崩溃的空指针一样。

然而,有时只有那张地图会失败,而所有其他地图要么被取消(预期),要么有些只是挂起(取消)

 val in_stream = env.addSource(myConsumer).name(topic_in).setParallelism(1)
  .map(x => doSomethingA(x)
  .map(x => doSomethingB(x)
  .map(x => doSomethingC(x) //In this map i want to safely stop the job based on some condition
  .map(x => doSomethingD(x)
  .addSink(new FlinkKafkaProducer09[String](broker, topic_out, new SimpleStringSchema()))

我希望安全地退出工作(所有地图都已取消)我该怎么做?

标签: scalaapache-flinkflink-streaming

解决方案


推荐阅读