I am using Spotify's Scio library for writing apache beam pipelines in scala. I want to search for files under a directory in a recursive way on a filesystem which can be hdfs, alluxio or GCS. Like *.jar should find all the files under the provided directory and sub-directories.
Apache beam sdk provided org.apache.beam.sdk.io.FileIO
class for such purpose where I can find files on one directory level using pipeline.apply(FileIO.match().filepattern(filesPattern))
.
How can I make it recursive to search for all files matching the provided pattern?
Currently, I am trying another approach, where I am creating resourceId of the provided pattern and getting current directory of the provided pattern, then I am trying to resolve all sub-directories in the current directory using resourceId.resolve()
method. But it is throwing an exception for it.
val currentDir = FileSystems.matchNewResource(filesPattern, false).getCurrentDirectory
val childDir = currentDir.resolve("{@literal *}", StandardResolveOptions.RESOLVE_DIRECTORY)
For currentDir.resolve
I am getting following exception:
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.lang.IllegalArgumentException: Illegal character in path at index 0: {@literal *}/
at java.net.URI.create(URI.java:852)
at java.net.URI.resolve(URI.java:1036)
at org.apache.beam.sdk.io.hdfs.HadoopResourceId.resolve(HadoopResourceId.java:46)
at com.sparkcognition.foundation.ingest.jobs.copyjob.FileOperations$.findFiles(BinaryFilesSink.scala:110)
at com.sparkcognition.foundation.ingest.jobs.copyjob.BinaryFilesSink$.main(BinaryFilesSink.scala:39)
at com.sparkcognition.foundation.ingest.jobs.copyjob.BinaryFilesSink.main(BinaryFilesSink.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
... 12 more
Caused by: java.net.URISyntaxException: Illegal character in path at index 0: {@literal *}/
at java.net.URI$Parser.fail(URI.java:2848)
at java.net.URI$Parser.checkChars(URI.java:3021)
at java.net.URI$Parser.parseHierarchical(URI.java:3105)
at java.net.URI$Parser.parse(URI.java:3063)
at java.net.URI.<init>(URI.java:588)
at java.net.URI.create(URI.java:850)
... 22 more
Please suggest what should be the right way to search for files recursively using apache beam?
References: https://beam.apache.org/releases/javadoc/2.11.0/index.html?org/apache/beam/sdk/io/fs/ResourceId.html
It looks like you have some code copied from some faulty javadoc. Some old versions of the example code were published with errors around the asterisks.
To find all files inside currentDir:
val childDir = currentDir.resolve("**", StandardResolveOptions.RESOLVE_FILES)