Search code examples
apache-nifi

How to detect duplicate records based on specific field values at the row level using NiFi?


Let's say you are ingesting a file or maybe even multiple files over different periods of time. If the records contained in the incoming flowfile are a daily load from a flat file for example, you need a way to detect and remove duplicate values based on specific fields.

Given this dataset below, we want to remove duplicates where the company name and the email address combination are unique:

CompanyName Name EmailAddress Rate
Big Org A John john@example.com 105
Big Org B Mike mike@example.com 130
Big Org A John john@example.com 140
Big Org C Brad brad@example.com 110

So that we would be left with this unique dataset based on UNIQUE(CompanyName,EmailAddress):

CompanyName Name EmailAddress Rate
Big Org A John john@example.com 105
Big Org B Mike mike@example.com 130
Big Org C Brad brad@example.com 110

How could we go about achieving this when receiving multiple files over possibly different time periods like a daily flat file import?


Solution

  • DeduplicateRecord NiFi Processor Block

    The DeduplicateRecord processor block can remove row-level duplicates from a flowfile containing multiple records using either a hash set or a bloom filter depending on the filter type you choose.

    A bloom filter will provide constant (efficient) memory space at the expense of probabilisitic duplicate detection. The processor allows you to set the level of precision which will determine how space efficient the bloom filter will be.

    You can also set the filter strategy to use a hash set for absolute guarantees about duplicate detection but at the expense of more memory usage. For very large datasets you may want to consider a bloom filter if a small number of false negatives in your dataset are ok.

    So then you would define record path values in the dynamic properties of the processor block like this:

    RecordPath Value
    /CompanyName ${field.value}
    /EmailAddress ${field.value}

    These two fields are then concatenated together (the default join character is ~) and then hashed (or not depending on how you configure the processor). That value is then compared and stored in the BloomFilter or HashSet to determine if the record is a duplicate.