Search code examples
amazon-s3apache-flinkflink-streaming

Flink `textInputFormat` does not process GZ compressed files from aws S3 `Source` file system


I followed (ZIP compressed input for Apache Flink) and wrote the following code piece to process .gz log files in a dir with a simple TextInputFormat. It works on my local test directory, scans and automatically opens the .gz file contents. However, when I run it with an s3 bucket source, it does not process .gz compressed files. This Flink job still opens .log files on the s3 bucket though. Seems it just does not uncompress the .gz files. How can I get this resolved on s3 file system?

public static void main(String[] args) throws Exception {

    final ParameterTool params = ParameterTool.fromArgs(args);
    final String sourceLogDirPath = params.get("source_log_dir_path", "s3://my-test-bucket-logs/"); // "/Users/my.user/logtest/logs"
    final Long checkpointInterval = Long.parseLong(params.get("checkpoint_interval", "60000"));
    
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().enableExternalizedCheckpoints(
        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.getConfig().setGlobalJobParameters(params);

    TextInputFormat textInputFormat = new TextInputFormat(new Path(sourceLogDirPath));
    textInputFormat.setNestedFileEnumeration(true);

    DataStream<String> stream = env.readFile(
            textInputFormat, sourceLogDirPath,
            FileProcessingMode.PROCESS_CONTINUOUSLY, 100);

    stream.print();
    env.execute();
}

This is my classpath jar flink libs:

/opt/flink/lib/flink-csv-1.13.2.jar:/opt/flink/lib/flink-json-1.13.2.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.13.2.jar:/opt/flink/lib/flink-table_2.12-1.13.2.jar:/opt/flink/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/lib/log4j-api-2.12.1.jar:/opt/flink/lib/log4j-core-2.12.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/lib/sentry_log4j2_deploy.jar:/opt/flink/lib/flink-dist_2.12-1.13.2.jar:::

P.S. I also tried s3a://<bucket>/ with no luck.


Solution

  • May be you can change the log to debug mode and observe whether the file is filtered out when the file is split.

    By default, files beginning with ‘.’ or ‘_‘ will be filtered out

    enter image description here