Search code examples
apache-nifijoltkylo

CSV to json with dynamic schema using NiFi


I am getting a CSV file from a 3rd party. Schema for this file is dynamic, the only thing I can be certain of is,

  1. each column with data will also have header name.
  2. file will always have a header.
  3. header name will always be a string of alphabets with no spaces and dots. (so, kind of "clean").
  4. values should be treated as strings, as I am not sure what they will be sending.

Now to use this type of data in my system, I am thinking of using MongoDB as staging area. As no. of columns, or order of columns, or columns name are not constant from one load to another. I think MongoDB will serve a good staging area.

I read about ConvertRecord processor, which is ideal for CSV to JSON converter, but I don't have a schema. I just want each row to go as a document, with header name as a key and value as value.

How should I go about it? Also this file is going to be in some 25-30 GB range, so I do not want to bring down my system.

I thought of doing it by my own processor (in Java), and I was able to get what I am looking for, but it seems to be taking too much time, and it kind of doesn't look optimal.

Let me know, if this can be achieved via existing processor?

Thanks, Rakesh

Updated on : 09/05/2018

<?xml version="1.0" encoding="UTF-8" standalone="yes"?><template encoding-version="1.2"><description></description><groupId>a2bd0551-0165-1000-7c6a-a32ca4db047c</groupId><name>csv_to_json_no_schema_v1</name><snippet><connections><id>91bc4a66-704c-3a2f-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>bb6c25ae-f2b6-386a-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>eb6cd54a-e1f1-3871-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>ad804e3c-f233-3556-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>64b15a56-8a5f-3297-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>invalid</selectedRelationships><source><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>bb6c25ae-f2b6-386a-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>c30bd123-c436-36ce-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>8a0e37da-acd2-3d72-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>valid</selectedRelationships><source><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>bb6c25ae-f2b6-386a-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>247d2139-26b7-31fe-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>1297bea9-b30f-3f45-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>failure</selectedRelationships><source><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>8a0e37da-acd2-3d72-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>45e5403f-99f7-3ddf-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>9f8f32f7-130c-35bd-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>defb04c4-c15c-3a07-0000-000000000000</groupId><id>8a0e37da-acd2-3d72-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><controllerServices><id>88b0195a-34b2-34f0-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><bundle><artifact>nifi-record-serialization-services-nar</artifact><group>org.apache.nifi</group><version>1.6.0</version></bundle><comments></comments><descriptors><entry><key>Schema Write Strategy</key><value><name>Schema Write Strategy</name></value></entry><entry><key>schema-access-strategy</key><value><name>schema-access-strategy</name></value></entry><entry><key>schema-registry</key><value><identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService><name>schema-registry</name></value></entry><entry><key>schema-name</key><value><name>schema-name</name></value></entry><entry><key>schema-version</key><value><name>schema-version</name></value></entry><entry><key>schema-branch</key><value><name>schema-branch</name></value></entry><entry><key>schema-text</key><value><name>schema-text</name></value></entry><entry><key>Date Format</key><value><name>Date Format</name></value></entry><entry><key>Time Format</key><value><name>Time Format</name></value></entry><entry><key>Timestamp Format</key><value><name>Timestamp Format</name></value></entry><entry><key>Pretty Print JSON</key><value><name>Pretty Print JSON</name></value></entry><entry><key>suppress-nulls</key><value><name>suppress-nulls</name></value></entry></descriptors><name>JsonRecordSetWriter</name><persistsState>false</persistsState><properties><entry><key>Schema Write Strategy</key><value>no-schema</value></entry><entry><key>schema-access-strategy</key></entry><entry><key>schema-registry</key></entry><entry><key>schema-name</key></entry><entry><key>schema-version</key></entry><entry><key>schema-branch</key></entry><entry><key>schema-text</key></entry><entry><key>Date Format</key></entry><entry><key>Time Format</key></entry><entry><key>Timestamp Format</key></entry><entry><key>Pretty Print JSON</key></entry><entry><key>suppress-nulls</key></entry></properties><state>ENABLED</state><type>org.apache.nifi.json.JsonRecordSetWriter</type></controllerServices><controllerServices><id>c3e80a29-498b-36d4-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><bundle><artifact>nifi-record-serialization-services-nar</artifact><group>org.apache.nifi</group><version>1.6.0</version></bundle><comments></comments><descriptors><entry><key>schema-access-strategy</key><value><name>schema-access-strategy</name></value></entry><entry><key>schema-registry</key><value><identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService><name>schema-registry</name></value></entry><entry><key>schema-name</key><value><name>schema-name</name></value></entry><entry><key>schema-version</key><value><name>schema-version</name></value></entry><entry><key>schema-branch</key><value><name>schema-branch</name></value></entry><entry><key>schema-text</key><value><name>schema-text</name></value></entry><entry><key>csv-reader-csv-parser</key><value><name>csv-reader-csv-parser</name></value></entry><entry><key>Date Format</key><value><name>Date Format</name></value></entry><entry><key>Time Format</key><value><name>Time Format</name></value></entry><entry><key>Timestamp Format</key><value><name>Timestamp Format</name></value></entry><entry><key>CSV Format</key><value><name>CSV Format</name></value></entry><entry><key>Value Separator</key><value><name>Value Separator</name></value></entry><entry><key>Skip Header Line</key><value><name>Skip Header Line</name></value></entry><entry><key>ignore-csv-header</key><value><name>ignore-csv-header</name></value></entry><entry><key>Quote Character</key><value><name>Quote Character</name></value></entry><entry><key>Escape Character</key><value><name>Escape Character</name></value></entry><entry><key>Comment Marker</key><value><name>Comment Marker</name></value></entry><entry><key>Null String</key><value><name>Null String</name></value></entry><entry><key>Trim Fields</key><value><name>Trim Fields</name></value></entry><entry><key>csvutils-character-set</key><value><name>csvutils-character-set</name></value></entry></descriptors><name>CSVReader</name><persistsState>false</persistsState><properties><entry><key>schema-access-strategy</key></entry><entry><key>schema-registry</key></entry><entry><key>schema-name</key></entry><entry><key>schema-version</key></entry><entry><key>schema-branch</key></entry><entry><key>schema-text</key></entry><entry><key>csv-reader-csv-parser</key></entry><entry><key>Date Format</key></entry><entry><key>Time Format</key></entry><entry><key>Timestamp Format</key></entry><entry><key>CSV Format</key></entry><entry><key>Value Separator</key></entry><entry><key>Skip Header Line</key><value>true</value></entry><entry><key>ignore-csv-header</key><value>true</value></entry><entry><key>Quote Character</key></entry><entry><key>Escape Character</key></entry><entry><key>Comment Marker</key></entry><entry><key>Null String</key></entry><entry><key>Trim Fields</key></entry><entry><key>csvutils-character-set</key></entry></properties><state>ENABLED</state><type>org.apache.nifi.csv.CSVReader</type></controllerServices><processors><id>8a0e37da-acd2-3d72-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><position><x>0.0</x><y>227.99996948242188</y></position><bundle><artifact>nifi-standard-nar</artifact><group>org.apache.nifi</group><version>1.6.0</version></bundle><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><descriptors><entry><key>record-reader</key><value><identifiesControllerService>org.apache.nifi.serialization.RecordReaderFactory</identifiesControllerService><name>record-reader</name></value></entry><entry><key>record-writer</key><value><identifiesControllerService>org.apache.nifi.serialization.RecordSetWriterFactory</identifiesControllerService><name>record-writer</name></value></entry></descriptors><executionNode>ALL</executionNode><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>record-reader</key><value>c3e80a29-498b-36d4-0000-000000000000</value></entry><entry><key>record-writer</key><value>88b0195a-34b2-34f0-0000-000000000000</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>ConvertRecord</name><relationships><autoTerminate>false</autoTerminate><name>failure</name></relationships><relationships><autoTerminate>false</autoTerminate><name>success</name></relationships><state>STOPPED</state><style/><type>org.apache.nifi.processors.standard.ConvertRecord</type></processors><processors><id>9f8f32f7-130c-35bd-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><position><x>11.0</x><y>483.0</y></position><bundle><artifact>nifi-standard-nar</artifact><group>org.apache.nifi</group><version>1.6.0</version></bundle><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><descriptors><entry><key>Log Level</key><value><name>Log Level</name></value></entry><entry><key>Log Payload</key><value><name>Log Payload</name></value></entry><entry><key>Attributes to Log</key><value><name>Attributes to Log</name></value></entry><entry><key>attributes-to-log-regex</key><value><name>attributes-to-log-regex</name></value></entry><entry><key>Attributes to Ignore</key><value><name>Attributes to Ignore</name></value></entry><entry><key>attributes-to-ignore-regex</key><value><name>attributes-to-ignore-regex</name></value></entry><entry><key>Log prefix</key><value><name>Log prefix</name></value></entry><entry><key>character-set</key><value><name>character-set</name></value></entry></descriptors><executionNode>ALL</executionNode><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Log Level</key><value>info</value></entry><entry><key>Log Payload</key><value>false</value></entry><entry><key>Attributes to Log</key></entry><entry><key>attributes-to-log-regex</key><value>.*</value></entry><entry><key>Attributes to Ignore</key></entry><entry><key>attributes-to-ignore-regex</key></entry><entry><key>Log prefix</key></entry><entry><key>character-set</key><value>UTF-8</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>LogAttribute</name><relationships><autoTerminate>true</autoTerminate><name>success</name></relationships><state>STOPPED</state><style/><type>org.apache.nifi.processors.standard.LogAttribute</type></processors><processors><id>bb6c25ae-f2b6-386a-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><position><x>670.0</x><y>225.0</y></position><bundle><artifact>nifi-standard-nar</artifact><group>org.apache.nifi</group><version>1.6.0</version></bundle><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><descriptors><entry><key>validate-csv-schema</key><value><name>validate-csv-schema</name></value></entry><entry><key>validate-csv-header</key><value><name>validate-csv-header</name></value></entry><entry><key>validate-csv-delimiter</key><value><name>validate-csv-delimiter</name></value></entry><entry><key>validate-csv-quote</key><value><name>validate-csv-quote</name></value></entry><entry><key>validate-csv-eol</key><value><name>validate-csv-eol</name></value></entry><entry><key>validate-csv-strategy</key><value><name>validate-csv-strategy</name></value></entry></descriptors><executionNode>ALL</executionNode><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>validate-csv-schema</key><value>NotNull,ParseInt(),Optional(ParseInt()),Null</value></entry><entry><key>validate-csv-header</key><value>true</value></entry><entry><key>validate-csv-delimiter</key><value>,</value></entry><entry><key>validate-csv-quote</key><value>"</value></entry><entry><key>validate-csv-eol</key><value>\n</value></entry><entry><key>validate-csv-strategy</key><value>Line by line validation</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>ValidateCsv</name><relationships><autoTerminate>false</autoTerminate><name>invalid</name></relationships><relationships><autoTerminate>false</autoTerminate><name>valid</name></relationships><state>STOPPED</state><style/><type>org.apache.nifi.processors.standard.ValidateCsv</type></processors><processors><id>eb6cd54a-e1f1-3871-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><position><x>688.0</x><y>0.0</y></position><bundle><artifact>nifi-standard-nar</artifact><group>org.apache.nifi</group><version>1.6.0</version></bundle><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><descriptors><entry><key>File Size</key><value><name>File Size</name></value></entry><entry><key>Batch Size</key><value><name>Batch Size</name></value></entry><entry><key>Data Format</key><value><name>Data Format</name></value></entry><entry><key>Unique FlowFiles</key><value><name>Unique FlowFiles</name></value></entry><entry><key>generate-ff-custom-text</key><value><name>generate-ff-custom-text</name></value></entry><entry><key>character-set</key><value><name>character-set</name></value></entry><entry><key>schema.name</key><value><name>schema.name</name></value></entry></descriptors><executionNode>ALL</executionNode><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>File Size</key><value>0B</value></entry><entry><key>Batch Size</key><value>1</value></entry><entry><key>Data Format</key><value>Text</value></entry><entry><key>Unique FlowFiles</key><value>false</value></entry><entry><key>generate-ff-custom-text</key><value>name,age,int_val,address Rakesh Prasad,0,99,"address 12 33333, 444441" rakesh Prasad1,1,,"address 12 33333, 444442" rakesh Prasad2,2,55,"address 12 33333, 444443" rakesh Prasad3,,33,"address 12 33333, 444444"</value></entry><entry><key>character-set</key><value>UTF-8</value></entry><entry><key>schema.name</key><value>empData</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>1 day</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>GenerateFlowFile</name><relationships><autoTerminate>false</autoTerminate><name>success</name></relationships><state>STOPPED</state><style/><type>org.apache.nifi.processors.standard.GenerateFlowFile</type></processors><processors><id>1297bea9-b30f-3f45-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><position><x>450.0</x><y>539.0</y></position><bundle><artifact>nifi-standard-nar</artifact><group>org.apache.nifi</group><version>1.6.0</version></bundle><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><descriptors><entry><key>Log Level</key><value><name>Log Level</name></value></entry><entry><key>Log Payload</key><value><name>Log Payload</name></value></entry><entry><key>Attributes to Log</key><value><name>Attributes to Log</name></value></entry><entry><key>attributes-to-log-regex</key><value><name>attributes-to-log-regex</name></value></entry><entry><key>Attributes to Ignore</key><value><name>Attributes to Ignore</name></value></entry><entry><key>attributes-to-ignore-regex</key><value><name>attributes-to-ignore-regex</name></value></entry><entry><key>Log prefix</key><value><name>Log prefix</name></value></entry><entry><key>character-set</key><value><name>character-set</name></value></entry></descriptors><executionNode>ALL</executionNode><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Log Level</key><value>info</value></entry><entry><key>Log Payload</key><value>false</value></entry><entry><key>Attributes to Log</key></entry><entry><key>attributes-to-log-regex</key><value>.*</value></entry><entry><key>Attributes to Ignore</key></entry><entry><key>attributes-to-ignore-regex</key></entry><entry><key>Log prefix</key></entry><entry><key>character-set</key><value>UTF-8</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>LogAttribute</name><relationships><autoTerminate>true</autoTerminate><name>success</name></relationships><state>STOPPED</state><style/><type>org.apache.nifi.processors.standard.LogAttribute</type></processors><processors><id>64b15a56-8a5f-3297-0000-000000000000</id><parentGroupId>defb04c4-c15c-3a07-0000-000000000000</parentGroupId><position><x>837.0</x><y>482.0000305175781</y></position><bundle><artifact>nifi-standard-nar</artifact><group>org.apache.nifi</group><version>1.6.0</version></bundle><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><descriptors><entry><key>Log Level</key><value><name>Log Level</name></value></entry><entry><key>Log Payload</key><value><name>Log Payload</name></value></entry><entry><key>Attributes to Log</key><value><name>Attributes to Log</name></value></entry><entry><key>attributes-to-log-regex</key><value><name>attributes-to-log-regex</name></value></entry><entry><key>Attributes to Ignore</key><value><name>Attributes to Ignore</name></value></entry><entry><key>attributes-to-ignore-regex</key><value><name>attributes-to-ignore-regex</name></value></entry><entry><key>Log prefix</key><value><name>Log prefix</name></value></entry><entry><key>character-set</key><value><name>character-set</name></value></entry></descriptors><executionNode>ALL</executionNode><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Log Level</key><value>info</value></entry><entry><key>Log Payload</key><value>false</value></entry><entry><key>Attributes to Log</key></entry><entry><key>attributes-to-log-regex</key><value>.*</value></entry><entry><key>Attributes to Ignore</key></entry><entry><key>attributes-to-ignore-regex</key></entry><entry><key>Log prefix</key></entry><entry><key>character-set</key><value>UTF-8</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>LogAttribute</name><relationships><autoTerminate>true</autoTerminate><name>success</name></relationships><state>STOPPED</state><style/><type>org.apache.nifi.processors.standard.LogAttribute</type></processors></snippet><timestamp>09/05/2018 01:32:27 EDT</timestamp></template>


Solution

  • You can use ConvertRecord with a CSV Reader and in the CSV Reader choose "Use String Fields From Header" for the Schema Access Strategy. This will create a schema dynamically from the header.