Search code examples
pythongroovyapache-nifi

read a file in NiFi with expression


I have a NiFi flow triggered by e-mail. The problem is that ListFile and GetFile processors are not initiated externally.

What I need:

I have some files:

context variable #{folder_to_read} = /home/input
#{folder_to_read}/MyBranches_2023-10-22_225510.csv
#{folder_to_read}/MyAccounts_2023-10-22_225510.csv
#{folder_to_read}/MyOrders/USAOrders_2023-10-22_215510.csv
#{folder_to_read}/MyOrders/EUAOrders_2023-10-22_215610.csv
...

I need to:

  • receive a letter and initiate flowfile (done by ConsumeEWS processor)
  • check and read a file by the patterns like MyBranches_$(now():format('yyyy-MM_dd'))_*.csv in the input folder
  • manipulate with data in a file (files).

The problem is that I can't find how to do it in one pipeline. ListFile and GetFile processors cannot be triggerd by e-mail, fenchfile doesn't accept a regular-like pattern.

Could you share how to do it in NiFi? Maybe it is possible to do this by ExecuteScript processor + python/groovy?


Solution

  • the following answer provides groovy as soon as question owner accepts this language as well


    I assume that incoming flowfile looks like this:

    #some comment
    filename1.ext
    filename2.ext2
    subfolder/filename3.ext3
    

    use GroovyExecuteScript processor, add base_path parameter to it that points to a folder with files you want to read and set script body to:

    def ff = session.get()
    if(!ff) return
    
    //read lines from incoming file and filter comments and empty lines
    def lines = ff.read().withReader("UTF-8"){r-> r.readLines()}.findAll{s-> s && !s.startsWith('#')}
    
    def outFiles = []
    lines.each{s->
        def ffOut = ff.clone(false) //clone all attributes, but not content
        ffOut.filename = s
        new File("${base_path}/${s}").withInputStream{rawIn->
            ffOut.write{rawOut-> rawOut << rawIn} // import content from file
        }
        outFiles.add(ffOut)
    }
    
    ff.remove() //drop current flowfile
    REL_SUCCESS << outFiles //transfer to success new file list