apache-flink - 如何捕捉 flink 的 readFile(path) 抛出的异常?
问题描述
我使用 flink 来监控 hdfs 中的新文件(文件是 gzip 格式),并处理它们。
env.readFile(filePath)
它可以在文件有效时工作,
但如果 gzip 文件无效,flink 作业将被终止。
有异常日志:
java.io.IOException: Error opening the Input Split hdfs://mdw:8020/user/data/15_077_4.gz [0,-1]: Not in GZIP format
at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:824) ~[k.jar:?]
at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:472) ~[k.jar:?]
at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:49) ~[k.jar:?]
at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.loadSplit(ContinuousFileReaderOperator.java:381) ~[k.jar:?]
at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.access$300(ContinuousFileReaderOperator.java:88) ~[k.jar:?]
at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$ReaderState$2.prepareToProcessRecord(ContinuousFileReaderOperator.java:112) ~[k.jar:?]
at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.processRecord(ContinuousFileReaderOperator.java:322) ~[k.jar:?]
at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:225) ~[k.jar:?]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) ~[k.jar:?]
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) ~[k.jar:?]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:301) ~[k.jar:?]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183) ~[k.jar:?]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569) ~[k.jar:?]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534) ~[k.jar:?]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) ~[k.jar:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) ~[k.jar:?]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
Caused by: java.util.zip.ZipException: Not in GZIP format
at java.util.zip.GZIPInputStream.readHeader(GZIPInputStream.java:165) ~[?:1.8.0_181]
at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:79) ~[?:1.8.0_181]
at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:91) ~[?:1.8.0_181]
at org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:43) ~[k.jar:?]
at org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:32) ~[k.jar:?]
at org.apache.flink.api.common.io.FileInputFormat.decorateInputStream(FileInputFormat.java:848) ~[k.jar:?]
at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:820) ~[k.jar:?]
... 16 more
我想跳过无效文件,而不是杀死 flink。
但是我不知道如何捕获异常,因为异常是由flink的内部代码抛出的。
我应该怎么办?
解决方案
我们有更具体的需求,因此我们最终编写了一个自定义FlatMapFunction
,该自定义由要检查的目录列表构成,并且会从自定义源接收常规的“tickler”事件。当它(在它的方法中)收到这个事件时,flatMap()
它会检查是否有新文件(匹配某些条件),如果有,它会打开文件,读取条目,并通过Collector
. 所以在这种情况下,我们可以完全控制错误处理。
推荐阅读
- reactjs - 反应 onClick 属性未触发
- postgresql - 如何将 Oracle 查询转换为 Postgresql 示例一些转换函数不起作用 Postgress
- java - Java Android Show with a recyclerview the strings of an array with a external layout
- c# - MSAL.NET 中的令牌缓存序列化不起作用
- visual-studio-code - 如何在声明后使 Prettier STOP DELETE “空格线”?
- javascript - 其他带有 toUpperCase()、toLowerCase() 和 Number.isInteger() 的 if 语句
- c# - Autofac 工作单元中的依赖注入
- amazon-web-services - AWS CodeBuild with Multi Docker Containers:无法准备上下文:无法评估 Dockerfile 路径中的符号链接
- node.js - 家庭助理 - 请求失败错误代码 400
- apache-kafka - 扩展消费者组的最大数量Kafka