Search code examples
jsonapache-nifi

How can I filter attribute from json file using nifi


So I'm working with a batch JSON file with the following value :

[
  {
    "eventType": "UPDATE",
    "eventTime": "2021-12-14T12:34:56.789012Z",
    "country": "ES",
    "resourceType": "Party"
  },
  {
    "eventType": "UPDATE",
    "eventTime": "2021-12-14T12:34:56.789012Z",
    "country": "ES",
    "resourceType": "Party"
  },
  {
    "eventType": "UPDATE",
    "eventTime": "2021-12-14T12:34:56.789012Z",
    "country": "FR",
    "resourceType": "Party"
  },
  {
    "eventType": "UPDATE",
    "eventTime": "2021-12-14T12:34:56.789012Z",
    "country": "FR",
    "resourceType": "Party"
  }
]

I'm able to extract attributes from JSON by using EvaluateJsonPath processor.

UpdateAttribute is the processor where i want to extract the attributes. Please find below snapshot of UpdateAttribute processor when value of "country="ES"

I want get json file like this:

[
  {
    "eventType": "UPDATE",
    "eventTime": "2021-12-14T12:34:56.789012Z",
    "country": "ES",
    "resourceType": "Party"
  },
  {
    "eventType": "UPDATE",
    "eventTime": "2021-12-14T12:34:56.789012Z",
    "country": "ES",
    "resourceType": "Party"
  }
]

enter image description here enter image description here enter image description here


Solution

  • Use a QueryRecord processor:

    • Record Reader: JsonTreeReader
    • Record Writer: JsonRecordSetWriter
    • filter (dynamic property): SELECT * FROM FLOWFILE WHERE country='ES'

    Input (JSON):

    [
      {
        "eventType": "UPDATE",
        "eventTime": "2021-12-14T12:34:56.789012Z",
        "country": "ES",
        "resourceType": "Party"
      },
      {
        "eventType": "UPDATE",
        "eventTime": "2021-12-14T12:34:56.789012Z",
        "country": "ES",
        "resourceType": "Party"
      },
      {
        "eventType": "UPDATE",
        "eventTime": "2021-12-14T12:34:56.789012Z",
        "country": "FR",
        "resourceType": "Party"
      },
      {
        "eventType": "UPDATE",
        "eventTime": "2021-12-14T12:34:56.789012Z",
        "country": "FR",
        "resourceType": "Party"
      }
    ]
    

    Output (JSON):

    [
      {
        "eventType": "UPDATE",
        "eventTime": "2021-12-14T12:34:56.789012Z",
        "country": "ES",
        "resourceType": "Party"
      },
      {
        "eventType": "UPDATE",
        "eventTime": "2021-12-14T12:34:56.789012Z",
        "country": "ES",
        "resourceType": "Party"
      }
    ]