Search code examples
pythoncsvjythonapache-nifi

How to update line with modified data in Jython?


I'm have a csv file which contains hundred thousands of rows and below are some sample lines..,

1,Ni,23,28-02-2015 12:22:33.2212-02
2,Fi,21,28-02-2015 12:22:34.3212-02
3,Us,33,30-03-2015 12:23:35-01
4,Uk,34,31-03-2015 12:24:36.332211-02

I need to get the last column of csv data which is in wrong datetime format. So I need to get default datetimeformat("YYYY-MM-DD hh:mm:ss[.nnn]") from last column of the data.

I have tried the following script to get lines from it and write into flow file.

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.readLines(inputStream, StandardCharsets.UTF_8)
    for line in text[1:]:
        outputStream.write(line + "\n") 

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile,PyStreamCallback())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename'))
  session.transfer(flowFile, REL_SUCCESS)

but I am not able to find a way to convert it like below output.

1,Ni,23,28-02-2015 12:22:33.221
2,Fi,21,29-02-2015 12:22:34.321
3,Us,33,30-03-2015 12:23:35
4,Uk,34,31-03-2015 12:24:36.332

I have checked solutions with my friend(google) and was still not able to find solution.

Can anyone guide me to convert those input data into my required output?


Solution

  • In this transformation the unnecessary data located at the end of each line, so it's really easy to manage transform task with regular expression.

    ^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?
    

    Check the regular expression and explanation here: https://regex101.com/r/sAB4SA/2

    As soon as you have a large file - better not to load it into the memory. The following code loads whole the file into the memory:

    IOUtils.readLines(inputStream, StandardCharsets.UTF_8)
    

    Better to iterate line by line.

    So this code is for ExecuteScript nifi processor with python (Jython) language:

    import sys
    import re
    import traceback
    from org.apache.commons.io import IOUtils
    from org.apache.nifi.processor.io import StreamCallback
    from org.python.core.util import StringUtil
    from java.lang import Class
    from java.io import BufferedReader
    from java.io import InputStreamReader
    from java.io import OutputStreamWriter
    
    
    class TransformCallback(StreamCallback):
        def __init__(self):
            pass
    
        def process(self, inputStream, outputStream):
            try:
                writer = OutputStreamWriter(outputStream,"UTF-8")
                reader = BufferedReader(InputStreamReader(inputStream,"UTF-8"))
                line = reader.readLine()
                p = re.compile('^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?')
                while line!= None:
                    # print line
                    match = p.search(line)
                    writer.write( match.group(1) + (match.group(3) if match.group(3)!=None else '') )
                    writer.write('\n')
                    line = reader.readLine()
                writer.flush()
                writer.close()
                reader.close()
            except:
                traceback.print_exc(file=sys.stdout)
                raise
    
    
    flowFile = session.get()
    if flowFile != None:
        flowFile = session.write(flowFile, TransformCallback())
    
        # Finish by transferring the FlowFile to an output relationship
        session.transfer(flowFile, REL_SUCCESS)
    

    And as soon as question is about nifi, here are alternatives that seems to be easier


    the same code as above but in groovy for nifi ExecuteScript processor:

    def ff = session.get()
    if(!ff)return
    ff = session.write(ff, {rawIn, rawOut->
        // ## transform streams into reader and writer
        rawIn.withReader("UTF-8"){reader->
            rawOut.withWriter("UTF-8"){writer->
                reader.eachLine{line, lineNum->
                    if(lineNum>1) { // # skip the first line
                        // ## let use regular expression to transform each line
                        writer << line.replaceAll( /^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?/ , '$1$3' ) << '\n'
                    }
                }
            }
        }
    } as StreamCallback)
    session.transfer(ff, REL_SUCCESS)
    

    ReplaceText processor

    And if regular expression is ok - the easiest way in nifi is a ReplaceText processor that could do regular expression replace line-by-line.

    In this case you don't need to write any code, just build the regular expression and configure your processor correctly.