I have flow with two processors: QueryDataBaseTable
-> PutDatabaseRecord
.
From Oracle
to Postgresql
via AvroReader
.
This is result of the query from Oracle
:
nmcl_id|assortment_id|nname |aname |modify_date |load_date|
-------+-------------+----------------------------------------------------------------------------+------------------------------------------------------------------------------+-------------------+---------+
1| 7|ДЛЯ РОЗНИЦЫ: ПАКЕТ-МАЙКА МАЛЫЙ 30+16Х50 НА КАССЫ ПЛАТНЫЙ |без ассорт. |2021-06-25 10:54:10| |
3| 7|ДЛЯ РОЗНИЦЫ:ПАКЕТ-МАЙКА БОЛЬШОЙ 46+22Х60 (ЛОГОТИП "СЕТЬ МАКСИ") |без ассорт. |2021-06-25 10:54:10| |
78| 7|КАКАО-НАПИТОК "MIX FIX" 375ГР ПЛАСТИК (6811) |без ассорт. |2021-06-25 10:54:10| |
This is DML
in Postgres
side:
CREATE TABLE src.task_meta_data (
id SERIAL,
nmcl_id int NOT NULL,
assortment_id int NOT NULL,
nname character varying(100) NOT NULL,
aname character varying(100) NOT NULL,
modify_date timestamp NOT NULL,
load_date timestamp NULL
);
ALTER TABLE ONLY src.task_meta_data
ADD CONSTRAINT pk_task_meta_data PRIMARY KEY (id);
This is config of source:
This is config of target:
And this is an error:
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | 2021-06-30 21:50:12,937 ERROR [Timer-Driven Process Thread-8] o.a.n.p.standard.PutDatabaseRecord PutDatabaseRecord[id=5de90010-017a-1000-94c8-eb00fa683473] Failed to put Records to database for StandardFlowFileRecord[uuid=778ace8f-464b-4162-ab52-49451ee94990,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1625074391585-2, container=default, section=2], offset=0, length=2951648],offset=0,name=778ace8f-464b-4162-ab52-49451ee94990,size=13864]. Routing to failure.: java.lang.NumberFormatException: For input string: "КРАБОВЫЕ ПАЛОЧКИ (ВЕСОВЫЕ) !"
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | java.lang.NumberFormatException: For input string: "КРАБОВЫЕ ПАЛОЧКИ (ВЕСОВЫЕ) !"
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at java.lang.Integer.parseInt(Integer.java:580)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at java.lang.Integer.parseInt(Integer.java:615)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.serialization.record.util.DataTypeUtils.toInteger(DataTypeUtils.java:1594)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:200)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:153)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:149)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.processors.standard.PutDatabaseRecord.executeDML(PutDatabaseRecord.java:709)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.processors.standard.PutDatabaseRecord.putToDatabase(PutDatabaseRecord.java:841)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.processors.standard.PutDatabaseRecord.onTrigger(PutDatabaseRecord.java:487)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1173)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:214)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at java.lang.Thread.run(Thread.java:748)
OK, so it seams that there is a bug in Nifi
, thus it retrieves column types for insert not by field names in the file, but by the column order.
So it tried to convert nname
column's value to int
, thus it was type of assortment_id
taken because of shift appeared from id column.
I mean, we have such a column order id DB and fields order in FlowFile
DB: id, nmcl_id, assortment_id, nname, aname, modify_date, load_date
FF: nmcl_id, assortment_id, nname, aname, modify_date, load_date
So I just removed id
column and united nmcl_id
and assortment_id
to be a primary key
.