首页 > 解决方案 > 如何捕捉 flink 的 readFile(path) 抛出的异常?

问题描述

我使用 flink 来监控 hdfs 中的新文件(文件是 gzip 格式),并处理它们。

env.readFile(filePath)

它可以在文件有效时工作,

但如果 gzip 文件无效,flink 作业将被终止。

有异常日志:

java.io.IOException: Error opening the Input Split hdfs://mdw:8020/user/data/15_077_4.gz [0,-1]: Not in GZIP format
    at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:824) ~[k.jar:?]
    at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:472) ~[k.jar:?]
    at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:49) ~[k.jar:?]
    at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.loadSplit(ContinuousFileReaderOperator.java:381) ~[k.jar:?]
    at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.access$300(ContinuousFileReaderOperator.java:88) ~[k.jar:?]
    at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$ReaderState$2.prepareToProcessRecord(ContinuousFileReaderOperator.java:112) ~[k.jar:?]
    at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.processRecord(ContinuousFileReaderOperator.java:322) ~[k.jar:?]
    at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:225) ~[k.jar:?]
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) ~[k.jar:?]
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) ~[k.jar:?]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:301) ~[k.jar:?]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183) ~[k.jar:?]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569) ~[k.jar:?]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534) ~[k.jar:?]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) ~[k.jar:?]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) ~[k.jar:?]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
Caused by: java.util.zip.ZipException: Not in GZIP format
    at java.util.zip.GZIPInputStream.readHeader(GZIPInputStream.java:165) ~[?:1.8.0_181]
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:79) ~[?:1.8.0_181]
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:91) ~[?:1.8.0_181]
    at org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:43) ~[k.jar:?]
    at org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:32) ~[k.jar:?]
    at org.apache.flink.api.common.io.FileInputFormat.decorateInputStream(FileInputFormat.java:848) ~[k.jar:?]
    at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:820) ~[k.jar:?]
    ... 16 more

我想跳过无效文件,而不是杀死 flink。

但是我不知道如何捕获异常,因为异常是由flink的内部代码抛出的。

我应该怎么办?

标签: apache-flink

解决方案


我们有更具体的需求,因此我们最终编写了一个自定义FlatMapFunction,该自定义由要检查的目录列表构成,并且会从自定义源接收常规的“tickler”事件。当它(在它的方法中)收到这个事件时,flatMap()它会检查是否有新文件(匹配某些条件),如果有,它会打开文件,读取条目,并通过Collector. 所以在这种情况下,我们可以完全控制错误处理。


推荐阅读