Search code examples
google-cloud-platformapache-nifibatch-processing

Mergin /adding fields inside of a CSV using NiFi


i got following task/problem:

I got a static data file, that file is pushed into a bigquery table. When pushed, the flowfile gets handed further. before the file is pushed again into another bq table BUT several field got some kind of Logic to be processed before the file is pushed into the second bq table.

example: There are 2 Fields in the input file which should be merged into one field before pushed into bq. -> concrete: content of Field GD270A and GD270B should be put together (strings, both)and put into GD260

The Pipeline work as far as the first push but im missing the merging logic, is there any default processor or do i need to develop a custom one?

current Set-up is a GCP (1 VM running NiFi, 2 Buckets and a BQ) The NiFi-pipe looks like this: The NiFi-pipe looks like this

Any help or Idea is very much appreciated!

Edit:

Header of the CSV: GD622|GD622PW|KZLOESCH|KZLOEDAT|GD100A|GD270A|GD270B|GD260|GD240|GD245|GD621|GD170|GD171|GD172|GD198A|GD198B|GD198C|GD198D|GD198E|GD198F|GD198G|GD455A|GD630A|GD455D|GD660|GD669|GD867A|GD205C|GD161|GD432|GD432A|GD649|FTRELEV|FTGUEAB|FTGUEBIS|GD968D|GD160|GD650A|GD630B|GD226|CUSIPNR|GD200|GD211|GD212|GD213|GD214|GD220|GD220A|GD221|GD225|GD228|GD230|GD258U|GD280A|GD290A|GD300|GD311A|GD311B|GD312|GD321|GD322|GD352|GD352A|GD400|GD481|GD545|GD636|GD685G|GD685H|GD801A|GD801AJN|GD802|GD802A|GD803E|GD804E|GD805|GD806|GD806A|GD808|GD808A|GD808B|GD808C|GD809A|GD811|GD815B|GD815C|GD821B|GD861A|GD861E|GD861F|GD862|GD910|GD910A|GD924|GD924B|GD970A|GD970I

one example line: ||1||1|VALL|oN||EUR|STK|EUR|U202|ZZZZ|ZZZZ|+000000000000000001.000000000||||||K6431|AD||||||||043|||50||+00028|4|956|36||+129|043||B|0916|SH|||||13|||||||049||7||N||||||||||||||||||||||||||

In Bold are the field which should be merged into GD260 as following:

||1||1|VALL|oN|VALLoN|EUR|STK|EUR|U202|ZZZZ|ZZZZ|+000000000000000001.000000000||||||K6431|AD||||||||043|||50||+00028|4|956|36||+129|043||B|0916|SH|||||13|||||||049||7||N||||||||||||||||||||||||||

hope this helps, "|" obviously is the delimiter :)


Solution

  • Sounds like you should be able to do this with a single UpdateRecord - see the Additional Details page for examples

    Using Replacement Value Strategy of Record Path Value - see the docs for how to use Record Paths

    Adding a user-defined Property to the Update Record with name /GD260 and a value of concat(/GD270A, /GD270B) - using the concat function