Search code examples
apache-sparkdataframehivehbaseorc

Efficient way to partially update a record in Dataframe


I have a system which accumulates batch data in a snapshot.

Each record in a batch contains an unique_id and a version and multiple other columns.

Previously whenever in a new batch an unique_id comes with a version bigger than the version present in the snapshot the syetm used to replace the entire record and rewrite as a new record. This is typically a merge of two dataframe based on the version.

For example :

 Snapshot: <Uid>   <Version> <col1> <col2>
           -----------------
              A1  | 1     |  ab | cd
              A2  | 1     |  ef | gh

 New Batch: <Uid>  <Version> <col1> 
           ------------------
              A3  | 1     |  gh
              A1  | 2     |  hh

See here col2 is absent in the new batch

 After Merge It will become,

           <Uid>  <Version> <col1> <col2>
           ------------------
              A3  | 1     |  gh  | Null
              A1  | 2     |  hh  | Null
              A2  | 1     |  ef  | gh

Here the problem is even if the data for the col2 didn't come for the Uid A2 ; after the merge that column is replaced by a null value. So the older value of the column is lost.

Now, I want to replace only the column for which the data have come

i.e. expected output

             <Uid>  <Version> <col1> <col2>
           ------------------
              A3  | 1     |  gh  | Null
              A1  | 2     |  hh  | cd
              A2  | 1     |  ef  | gh

See the A1 unique id the col2 value is intact.

Although if the batch has the record for A1 as

New Batch: <Uid>  <Version> <col1> <col2>
           ------------------
            A1  | 2     |  hh  | uu

The output will be ------------------

              A1  | 2     |  hh  | uu
              A2  | 1     |  ef  | gh

Here the entire record of A2 is replaced.

As per current system I am using spark and storing the data as parquet. I can tweak the Merge process to incorporate this change

However, I would like to know if this is an optimal process to store data for these use case.

I am evaluating Hbase and Hive ORC along with possible change I can make to the merge process.

Any suggestion will be highly appreciated.


Solution

  • As far as I understand, you need to use full outer join between snapshot and journal(delta) and then use coalesce, for instance:

      def applyDeduplicatedJournal(snapshot: DataFrame, journal: DataFrame, joinColumnNames: Seq[String]): DataFrame = {
    
        val joinExpr = joinColumnNames
          .map(column => snapshot(column) === journal(column))
          .reduceLeft(_ && _)
    
        val isThereNoJournalRecord = joinColumnNames
          .map(jCol => journal(jCol).isNull)
          .reduceLeft(_ && _)
    
        val selectClause = snapshot.columns
          .map(col => when(isThereNoJournalRecord, snapshot(col)).otherwise(coalesce(journal(col), snapshot(col))) as col)
    
        snapshot
          .join(journal, joinExpr, "full_outer")
          .select(selectClause: _*)
    }
    

    In this case you will merge snapshot with journal with fallback to snapshot value in case when journal has null value.

    Hope it helps!