Search code examples
jsondatabasefieldapache-nifi

how to insert the whole json record(as a single json string) in NiFi flow file into json column of database


I am using NiFi to load data from database A in json record format and want to insert json record into a json column of a table in database B.

A => ExecuteSQLRecord => [jsonRow,...] => [?] => B.table(data json)

which processor should I use at the position [?]. I tried jolt and PutDatabaseRecord, but failed:

[jsonRow,...] => jolt =>[{data:jsonRow},...]=> PutDatabaseRecord => B.Table(data json)

I got error: Batch entry 0 insert into schema.table(data) values(('MapRecord [{...}]')) was aborted: ERROR: invalid input syntax for type json Detail: Token "MapRecord" is invalid. Where: JSON data, line 1: MapRecord... unnamed portal parameter $1 = '...' Call getNextException to see other errors in the batch.

[2024-08-03] New test shows it is possible to realize it just using jolt and putdatabaserecord,the flow is:

GenerateFlowFile=>JoltTransformJson=>PutDatabaseRecord

the data path is:

[{id,name},{id,nam}]=>[{data:{id,name}},{data:{id,name}}]=>pgTable(id serial,data json)

along this flow, pg get two rows data as expected. It told me it's feasible to do it simply using jolt and putdatabaserecord. The problem emerges however in the production env. In spite of it, to keep flow simple as possible, i'd like to prior to make deep test and further reading to find out the reason embeded in production env.

[2024-08-03] New test shows that null plays role in the problem. If a json row has any property with null value, it would be embraced with 'MapRecord[(...)]' before sent to db. That leads to json parsing error. However I don't want to remove null property except I have no alternative because field omission test is also a part of data validation in db. Keeping null value and removing the wrappage of 'MapRecord' is the preference.


Solution

  • UpdateRecord might be the simplest way. UpdateRecord with config property:/data value: escapeJson(/data) accomplish the job. One want to do this need to understand NiFi record path. there is the site https://nifi.apache.org/documentation/nifi-2.0.0-M4/html/record-path-guide.html.