Search code examples
jsonparsinghbaseapache-nifi

Parsing incoming Json message coming from a NiFi Stream into Hbase table


​Hi All, I am getting a Stream of message from a Kafka Topic in NiFi which I am reading via a consumer process. Message is in the format of json (dummy json values, json format same as original one):

{   "schema": {
    "type": "struct",
    "name": "emp_table",
    "fields": [
      {
        "field": "emp_id",
        "type": "string"
      },
      {
        "field": "emp_name",
        "type": "String"
      },
      {
        "field": "city",
        "type": "string"
      },
      {
        "field": "emp_sal",
        "type": "string"
      },
      {
        "field": "manager_name",
        "type": "string"
      }
    ]   },   "payload": {
    "emp_id": "1",
    "emp_name": "abc",
    "city": "NYK",
    "emp_sal": "100000",
    "manager_name": "xyz"   } }

As u can see here actual table name is under Schema and column values is under payload. I am able to parse column values and put into Hbase table by using EvaluateJsonPath and PutHBaseJson processors in NiFi.

What I am able to achieve is manually putting table name and rowid. But my issue is I want to fetch tablename(in above example emp_table) and a rowid (in above example emp_id) from the json and at runtime provide those values to PutHbaseJson processor in NiFi along with the column values.

enter image description here


Solution

  • You should be able to add another JSON path expression to EvaluateJsonPath with something like:

    table = $.schema.name
    

    Then in PutHBaseJson make the table name ${table}, or whatever you named it in EvaluateJsonPath.