Search code examples
apache-flink

How to catch the exception thrown from flink's readFile(path)?


I use flink to monitor new files in hdfs (file is gzip format), and process them.

env.readFile(filePath)

It can work when file is valid,

but if gzip file is invalid, flink job will be killed.

There is exception log:

java.io.IOException: Error opening the Input Split hdfs://mdw:8020/user/data/15_077_4.gz [0,-1]: Not in GZIP format
    at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:824) ~[k.jar:?]
    at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:472) ~[k.jar:?]
    at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:49) ~[k.jar:?]
    at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.loadSplit(ContinuousFileReaderOperator.java:381) ~[k.jar:?]
    at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.access$300(ContinuousFileReaderOperator.java:88) ~[k.jar:?]
    at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$ReaderState$2.prepareToProcessRecord(ContinuousFileReaderOperator.java:112) ~[k.jar:?]
    at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.processRecord(ContinuousFileReaderOperator.java:322) ~[k.jar:?]
    at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:225) ~[k.jar:?]
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) ~[k.jar:?]
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) ~[k.jar:?]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:301) ~[k.jar:?]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183) ~[k.jar:?]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569) ~[k.jar:?]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534) ~[k.jar:?]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) ~[k.jar:?]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) ~[k.jar:?]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
Caused by: java.util.zip.ZipException: Not in GZIP format
    at java.util.zip.GZIPInputStream.readHeader(GZIPInputStream.java:165) ~[?:1.8.0_181]
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:79) ~[?:1.8.0_181]
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:91) ~[?:1.8.0_181]
    at org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:43) ~[k.jar:?]
    at org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:32) ~[k.jar:?]
    at org.apache.flink.api.common.io.FileInputFormat.decorateInputStream(FileInputFormat.java:848) ~[k.jar:?]
    at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:820) ~[k.jar:?]
    ... 16 more

I want to skip the invalid file, rather than kill flink.

But I don't know how to catch exception, because the exception is thrown by flink's internal code.

What should I do?


Solution

  • We had more specific needs, so we wound up writing a custom FlatMapFunction that was constructed with a list of directories to check, and would received a regular "tickler" event from a custom source. When it got this event (in its flatMap() method) it would check if there were new files (matching some criteria), and if so then it would open the file, read entries, and emit them via the Collector. So in this situation we've got complete control over error handling.