I've been trying to use a MoveHDFS
processor to move parquet files from a /working/partition/
directory in hdfs to a /success/partition/
directory. The partition value is set based on a ExecuteSparkJob processor earlier in the flow. After finding my parquet files in the root / directory, I found the following in the processor description for Output Directory:
The HDFS directory where the files will be moved to Supports Expression Language: true (will be evaluated using variable registry only)
Turns out the processor was sending the files to /
instead of ${dir}/
.
Since my attributes are set on the fly based on the spark processing result, I can't simply add to the variable registry and restart nodes for each flowfile (which from my limited understanding is what using the variable registry requires). One option is to use an ExecuteStreamCommand processor with a custom script to accomplish this use case. Is that my only option here or is there a built-in way to move HDFS files to attribute-set directories?
You can try this approach :
step 1 : Use MoveHDFS to move your file to temporary location, say path X. Input directory property in MoveHDFS processor can accept flowfile attribute.
step 2 : Connect success connection to FetchHDFS processor.
step 3 : Now in Fetch HDFS processor you can write the expression language for HDFS Filename property as ${absolute.hdfs.path}/${filename}. This will fetch the file data from path X into flowfile content.
step 4 : Connect success connection from FetchHDFS to PutHDFS processor.
step 5 : Configure PutHDFS directory property as per your requirements to accept the flowfile attribute for the partion data on the fly.
Cons: One con in this approach is , the duplicate copy which will be created from moveHDFS to store the data temporarily before sending the data to the actual location. You might have to develop a separate flow to delete the duplicate copy if not required.