Search code examples
javascalaapache-beamspotify-scio

Apache beam wildcard recursive search for files


I am using Spotify's Scio library for writing apache beam pipelines in scala. I want to search for files under a directory in a recursive way on a filesystem which can be hdfs, alluxio or GCS. Like *.jar should find all the files under the provided directory and sub-directories.

Apache beam sdk provided org.apache.beam.sdk.io.FileIO class for such purpose where I can find files on one directory level using pipeline.apply(FileIO.match().filepattern(filesPattern)).

How can I make it recursive to search for all files matching the provided pattern?

Currently, I am trying another approach, where I am creating resourceId of the provided pattern and getting current directory of the provided pattern, then I am trying to resolve all sub-directories in the current directory using resourceId.resolve() method. But it is throwing an exception for it.

    val currentDir = FileSystems.matchNewResource(filesPattern, false).getCurrentDirectory
    val childDir = currentDir.resolve("{@literal *}", StandardResolveOptions.RESOLVE_DIRECTORY)

For currentDir.resolve I am getting following exception:

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.lang.IllegalArgumentException: Illegal character in path at index 0: {@literal *}/
        at java.net.URI.create(URI.java:852)
        at java.net.URI.resolve(URI.java:1036)
        at org.apache.beam.sdk.io.hdfs.HadoopResourceId.resolve(HadoopResourceId.java:46)
        at com.sparkcognition.foundation.ingest.jobs.copyjob.FileOperations$.findFiles(BinaryFilesSink.scala:110)
        at com.sparkcognition.foundation.ingest.jobs.copyjob.BinaryFilesSink$.main(BinaryFilesSink.scala:39)
        at com.sparkcognition.foundation.ingest.jobs.copyjob.BinaryFilesSink.main(BinaryFilesSink.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
        ... 12 more
Caused by: java.net.URISyntaxException: Illegal character in path at index 0: {@literal *}/
        at java.net.URI$Parser.fail(URI.java:2848)
        at java.net.URI$Parser.checkChars(URI.java:3021)
        at java.net.URI$Parser.parseHierarchical(URI.java:3105)
        at java.net.URI$Parser.parse(URI.java:3063)
        at java.net.URI.<init>(URI.java:588)
        at java.net.URI.create(URI.java:850)
        ... 22 more

Please suggest what should be the right way to search for files recursively using apache beam?

References: https://beam.apache.org/releases/javadoc/2.11.0/index.html?org/apache/beam/sdk/io/fs/ResourceId.html


Solution

  • It looks like you have some code copied from some faulty javadoc. Some old versions of the example code were published with errors around the asterisks.

    To find all files inside currentDir:

    val childDir = currentDir.resolve("**", StandardResolveOptions.RESOLVE_FILES)