Search code examples
hadoopamazon-s3apache-flinkflink-streaming

Flink using S3AFileSystem does not read subfolders from S3


We are using Flink 1.2.0 with the suggested S3AFileSystem configuration. A simple streaming job works as expected when its source is a single folder within an S3 bucket.

The job runs without errors--but does not produce output--when its source is a folder which itself contains subfolders.

For clarity, below is a model of the S3 bucket. Running the job to point to s3a://bucket/folder/2017/04/25/01/ properly reads all three objects and any subsequent objects that appear in the bucket. Pointing the job to s3a://bucket/folder/2017/ (or any other intermediate folder) leads to a job that runs without producing anything.

In fits of desperation, we've tried the permutations that [in|ex]clude the trailing /.

.
`-- folder
    `-- 2017
        `-- 04
            |-- 25
            |   |-- 00
            |   |   |-- a.txt
            |   |   `-- b.txt
            |   `-- 01
            |       |-- c.txt
            |       |-- d.txt
            |       `-- e.txt
            `-- 26

Job code:

def main(args: Array[String]) {

  val parameters = ParameterTool.fromArgs(args)
  val bucket = parameters.get("bucket")
  val folder = parameters.get("folder")

  val path = s"s3a://$bucket/$folder"

  val env = StreamExecutionEnvironment.getExecutionEnvironment

  val lines: DataStream[String] = env.readFile(
    inputFormat = new TextInputFormat(new Path(path)),
    filePath = path,
    watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
    interval = Time.seconds(10).toMilliseconds)

  lines.print()
  env.execute("Flink Streaming Scala API Skeleton")
}

core-site.xml is configured per the docs:

<configuration>
  <property>
    <name>fs.s3a.impl</name>
    <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
  </property>
  <property>
    <name>fs.s3a.buffer.dir</name>
    <value>/tmp</value>
  </property>
</configuration>

We have included all the jars for S3AFileSystem listed here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/aws.html#flink-for-hadoop-27

We are stumped. This seems like it should work; there are plenty of breadcrumbs on the internet that indicate that this did work. [e.g., http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-files-from-an-S3-folder-td10281.html]

Help me, fellow squirrels... you're my only hope!


Solution

  • Answering my own question... with help from Steve Loughran above.

    In Flink, when working with a file-based data source to process continuously, FileInputFormat does not enumerate nested files by default.

    This is true whether the source is S3 or anything else.

    You must set it like so:

    def main(args: Array[String]) {
    
      val parameters = ParameterTool.fromArgs(args)
      val bucket = parameters.get("bucket")
      val folder = parameters.get("folder")
    
      val path = s"s3a://$bucket/$folder"
    
      val env = StreamExecutionEnvironment.getExecutionEnvironment
    
      val textInputFormat = new TextInputFormat(new Path(path))
    
      //this is important!
      textInputFormat.setNestedFileEnumeration(true)
    
      val lines: DataStream[String] = env.readFile(
        inputFormat = textInputFormat,
        filePath = path,
        watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
        interval = Time.seconds(10).toMilliseconds)
    
      lines.print()
      env.execute("Flink Streaming Scala API Skeleton")
    

    }