首页 > 解决方案 > RichAsyncFunction 中的 Apache flink 超时处理

问题描述

我正在使用 1.5.0 版本的 flink。

官方文档说: Timeout Handling When an async I/O request times out, by default an exception is thrown and job is restarted. If you want to handle timeouts, you can override the AsyncFunction#timeout method.

1 - 事实上,当发生超时异常时,日志中没有任何概念(任何级别)并且作业没有失败。

2 - AsyncFunction 接口中没有方法超时:)

为什么这与我有关->

我使用 AsyncDataStream.unorderedWait,它从 kafka 消耗并使用 RichAsyncFunction 处理结果。事实上,所有消息都已读取(正确的偏移量已提交给 zookeeper),但并非所有消息都到达 RichAsyncFunction.asyncInvoke。由于它不记录超时异常,我只是想知道它是否也会吞噬(不显示)其他异常。

有任何想法吗 ?非常感谢 !

标签: javaapache-flinkflink-streaming

解决方案


好的,看来我找到了原因:

AsyncDataStream.unorderedWait 从流中使用并将元素传递给 RichAsyncFunction 包装器。但是,如果 RichAsyncFunction.open 方法仍在执行并且元素在超时传递给 unorderedWait 后静默死亡,则流中的元素不会传递给 asyncInvoke。


推荐阅读