首页 > 解决方案 > 当重新启动一个 flink 作业/作业正在进行故障转移时,你应该怎么做才能优雅地释放所有资源?

问题描述

我们有一个 flink 作业,它持有一些 IO 资源,例如 tcp 连接等。我们总是在连接函数周围加上一个 finally 块来关闭连接并在所有操作符的 close() 方法中释放连接。但是我们发现时不时发生故障转移时连接没有释放,因为我们在服务器端发现了很多 CLOSE_WAIT 状态。我们猜测 flink 可能会使用一些中断方法来重新启动作业,这样代码就不会进入 finally 块。当 flink 进行故障转移时,释放资源的正确方法是什么?

标签: tcpapache-flinkflink-streaming

解决方案


为了能够访问函数的flink生命周期,你应该在用户定义的函数中实现你的逻辑,实现RichFunction接口。此类定义函数生命周期的方法,以及访问函数执行的上下文的方法。

除其他外,此接口公开了close()应该用于清理工作的方法:


    /**
     * Tear-down method for the user code. It is called after the last call to the main working methods
     * (e.g. <i>map</i> or <i>join</i>). For functions that  are part of an iteration, this method will
     * be invoked after each iteration superstep.
     *
     * <p>This method can be used for clean up work.
     *
     * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
     *                   runtime catches an exception, it aborts the task and lets the fail-over logic
     *                   decide whether to retry the task execution.
     */
    void close() throws Exception;

所以,我相信这个close()函数是优雅释放资源的合适位置。


推荐阅读