Search code examples
jsonhdfsapache-nifiminify

extract certain json object nifi Json


Im trying to extracting json objects and store it to hdfs. I'm targeting message attribute which is a6,b6,c6,d6,e6

json sample

{
   "@timestamp":"2020-07-06T07:35:29.047Z",
   "@metadata":{
      "beat":"filebeat",
      "type":"_doc",
      "version":"7.7.1"
   },
   "log":{
      "offset":91,
      "file":{
         "path":"C:\\Program Files\\Filebeat\\test-kafka\\test_csv.csv"
      }
   },
   "message":"a6,b6,c6,d6,e6",
   "input":{
      "type":"log"
   },
   "ecs":{
      "version":"1.5.0"
   },
   "host":{
      "name":"host"
   },
   "agent":{
      "version":"7.7.1",
      "type":"filebeat",
      "ephemeral_id":"0b4a288f-f7ac-4db9-835e-60ca07a45fff",
      "hostname":"host",
      "id":"5e2fec03-bbdc-4f91-acc9-4ab36c7268db"
   }
}

enter image description here

GenerateFlowFile properties I copied json sample into custom text, GenerateFlowFile

JsonEvaluatePath properties add message and json query, JsonEvaluatePath properties

but problem JsonEvaluatePath not working as i expected, i thought it will extracting only message attribute.

hadoop@ambari:~$ hdfs dfs -cat /user/test/5a422f02-9074-4384-a3c9-f3e3ce7c2e40
{
   "@timestamp":"2020-07-06T07:35:29.047Z",
   "@metadata":{
      "beat":"filebeat",
      "type":"_doc",
      "version":"7.7.1"
   },
   "log":{
      "offset":91,
      "file":{
         "path":"C:\\Program Files\\Filebeat\\test-kafka\\test_csv.csv"
      }
   },
   "message":"a6,b6,c6,d6,e6",
   "input":{
      "type":"log"
   },
   "ecs":{
      "version":"1.5.0"
   },
   "host":{
      "name":"host"
   },
   "agent":{
      "version":"7.7.1",
      "type":"filebeat",
      "ephemeral_id":"0b4a288f-f7ac-4db9-835e-60ca07a45fff",
      "hostname":"host",
      "id":"5e2fec03-bbdc-4f91-acc9-4ab36c7268db"
   }
}

Am i missing something?


Solution

  • Since you used EvaluateJsonPath with destination set as flow file attributes, it extracted message into a flow file attribute and the content of the flow file is still the same as it was before. You would need to use another processor like AttributesToJson before PutHDFS to rewrite the flow file content with the attributes you want. An alternative might be to set EvaluateJsonPath destination to flow file content, but I'm not sure if that produces valid json.