Search code examples
amazon-web-servicesapache-nififlowfile

Generating Single Flow file for loading it into S3


I have a Nifi Flow, which fetches a data from RDS tables and load into S3 as flat files, now i need to generate another file which will be having the name of the file that I am loading into S3 bucket, this needs to be a separate flow;

example: if the RDS extracted flat file name is RDS.txt, then the new generated file should have rds.txt as content and I need to load this file to same S3 bucket.

Problem I face is I am using a generate flowfile processor and adding the flat file name as custom text in flowfile, but i could not set up any upstream for Generate flow file processor, so this is generating more files, if I use the merge content processor after the generate flow file processor, I could see duplicate values in the flowfile.

Can anyone help me out in this


Solution

  • I have a Nifi Flow, which fetches a data from RDS tables and load into S3 as flat files, now i need to generate another file which will be having the name of the file that I am loading into S3 bucket, this needs to be a separate flow;

    Easiest path to do this is to chain something after PutS3Object that will update the flowfile contents with what you want. It would be really simple to write with ExecuteScript. Something like this:

    def ff = session.get()
    if (ff) {
      def updated = session.write(ff, {
        it.write(ff.getAttribute("filename").bytes)
      } as OutputStreamCallback)
      updated = session.putAttribute(updated, "is_updated", "true")
      session.transfer(updated, REL_SUCCESS)
    }
    

    Then you can put a RouteOnAttribute after PutS3Object and have it route to either a null route if it detects the attribute is_updated or route back to PutS3Object if it's not been updated.