Search code examples
apache-nifi

Modify csv with Apache Nifi


I'm receiving a kinda weird .csv file from FetchFTP processor. It looks like:

Agency Name:IKEA,,,,,,,,,,,,,,
Advertiser Name: Ingka,,,,,,,,,,,,,,
Campaign Name:All,,,,,,,,,,,,,,
Date Resolution:Days,,,,,,,,,,,,,,
Campaign Dates:N/A,,,,,,,,,,,,,,
Report Date Range:Last X Days (25.06.2020 - 01.07.2020),,,,,,,,,,,,,,
Report Creation Date: 02.07.2020 5:26:18 (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,

You must save the report locally to create a pivot table based on the report data.,,,,,,,,,,,,,,

,,,,,,,,,,,,,,,

Advertiser Name,Advertiser ID,Campaign Name,Campaign ID,Date,Site Name,Site ID,Device Type,Placement Name,Placement ID,Clickthrough URL,* Clicks,* Served Impressions,* Total Recordable Impressions (IAB),* Total Viewable Impressions (IAB)
Ingka,190530,1_flight_0119,947535,25.06.2020,Auditorius SE,101304,Smart Phone,Flight_EK_Auditorius_Video_mobile,27353235,https://www.ikea.com/promo/wifi?utm_source=Auditorius&utm_medium=Video_mobile,0,0,0,0
Ingka,190530,1_flight_0119,947535,28.06.2020,Between Exchange SE,124598,PC,Flight_IQP_Between_Exchange_Banner_728x90_DCO,27359134,,0,0,0,0
Data was updated last on 7/2/2020 12:00:00 AM (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,
Viewability mode is set per individual campaign,,,,,,,,,,,,,,

I can't save it in this format into database. What I want:

  1. Remove this useless block:
Agency Name:IKEA,,,,,,,,,,,,,,
Advertiser Name: Ingka,,,,,,,,,,,,,,
Campaign Name:All,,,,,,,,,,,,,,
Date Resolution:Days,,,,,,,,,,,,,,
Campaign Dates:N/A,,,,,,,,,,,,,,
Report Date Range:Last X Days (25.06.2020 - 01.07.2020),,,,,,,,,,,,,,
Report Creation Date: 02.07.2020 5:26:18 (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,

You must save the report locally to create a pivot table based on the report data.,,,,,,,,,,,,,,

,,,,,,,,,,,,,,,

  1. Remove this useless footer:
Data was updated last on 7/2/2020 12:00:00 AM (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,
Viewability mode is set per individual campaign,,,,,,,,,,,,,,
  1. Change headers names.

From: Advertiser Name,Advertiser ID,Campaign Name,Campaign ID,Date,Site Name,Site ID,Device Type,Placement Name,Placement ID,Clickthrough URL,* Clicks,* Served Impressions,* Total Recordable Impressions (IAB),* Total Viewable Impressions (IAB)

To: advertiser_name,advertiser_id,campaign_name,campaign_id,report_date,site,site_id,device,placement_name,placement_id,url,clicks,imps,total_record_imps,total_view_imps

Any tools to reach it with Apache Nifi?


Solution

  • Cleanup needs to be done to your data to make valid CSV format. You can use ExecuteScript or ExecuteStreamCommand processor to execute data cleaning script, say python, which will clean the incoming data to your desired format.

    Below code snippet (header standardization and data cleanup) will give you an idea about how to access flowfile content using ExecuteScript processor configured for Python as script engine -

    from org.apache.commons.io import IOUtils
    from java.nio.charset import StandardCharsets
    from org.apache.nifi.processor.io import StreamCallback
    from org.apache.nifi.processors.script import ExecuteScript
    from org.python.core.util.FileUtil import wrap
    from io import StringIO
    import re
    
    
    # Define a subclass of StreamCallback for use in session.write()
    class PyStreamCallback(StreamCallback):
        def __init__(self):
            pass
    
        def process(self, inputStream, outputStream):
            with wrap(inputStream) as f:
                lines = f.readlines()
    
            outer_new_value_list = []
            is_csv_data = False
            for csv_row in lines:
                if not is_csv_data:
                    if csv_row.startswith("Advertiser Name,Advertiser ID,"):
                        is_csv_data = True
                    else:
                        continue
                if is_csv_data:
                    if csv_row.startswith("Data was updated last on"):
                        break
                    else:
                        outer_new_value_list.append(csv_row)
    
            outer_new_value_list[0] = outer_new_value_list[0].replace(' ', '_').replace('*', '').replace('-', '_').lower()
            with wrap(outputStream, 'w') as filehandle:
                filehandle.writelines("%s" % line for line in outer_new_value_list)
    
    
    # end class
    flowFile = session.get()
    if (flowFile != None):
        flowFile = session.write(flowFile, PyStreamCallback())
        session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
    # implicit return at the end