Hi All, I am getting a Stream of message from a Kafka Topic in NiFi which I am reading via a consumer process. Message is in the format of json (dummy json values, json format same as original one):
{ "schema": {
"type": "struct",
"name": "emp_table",
"fields": [
{
"field": "emp_id",
"type": "string"
},
{
"field": "emp_name",
"type": "String"
},
{
"field": "city",
"type": "string"
},
{
"field": "emp_sal",
"type": "string"
},
{
"field": "manager_name",
"type": "string"
}
] }, "payload": {
"emp_id": "1",
"emp_name": "abc",
"city": "NYK",
"emp_sal": "100000",
"manager_name": "xyz" } }
As u can see here actual table name is under Schema and column values is under payload. I am able to parse column values and put into Hbase table by using EvaluateJsonPath and PutHBaseJson processors in NiFi.
What I am able to achieve is manually putting table name and rowid. But my issue is I want to fetch tablename(in above example emp_table) and a rowid (in above example emp_id) from the json and at runtime provide those values to PutHbaseJson processor in NiFi along with the column values.
You should be able to add another JSON path expression to EvaluateJsonPath with something like:
table = $.schema.name
Then in PutHBaseJson make the table name ${table}, or whatever you named it in EvaluateJsonPath.