I have a system which accumulates batch data in a snapshot.
Each record in a batch contains an unique_id and a version and multiple other columns.
Previously whenever in a new batch an unique_id comes with a version bigger than the version present in the snapshot the syetm used to replace the entire record and rewrite as a new record. This is typically a merge of two dataframe based on the version.
For example :
Snapshot: <Uid> <Version> <col1> <col2>
-----------------
A1 | 1 | ab | cd
A2 | 1 | ef | gh
New Batch: <Uid> <Version> <col1>
------------------
A3 | 1 | gh
A1 | 2 | hh
See here col2 is absent in the new batch
After Merge It will become,
<Uid> <Version> <col1> <col2>
------------------
A3 | 1 | gh | Null
A1 | 2 | hh | Null
A2 | 1 | ef | gh
Here the problem is even if the data for the col2
didn't come for the Uid
A2 ; after the merge that column is replaced by a null value. So the older value of the column is lost.
Now, I want to replace only the column for which the data have come
i.e. expected output
<Uid> <Version> <col1> <col2>
------------------
A3 | 1 | gh | Null
A1 | 2 | hh | cd
A2 | 1 | ef | gh
See the
A1
unique id the col2 value is intact.
Although if the batch has the record for A1 as
New Batch: <Uid> <Version> <col1> <col2>
------------------
A1 | 2 | hh | uu
The output will be ------------------
A1 | 2 | hh | uu
A2 | 1 | ef | gh
Here the entire record of A2 is replaced.
As per current system I am using spark and storing the data as parquet. I can tweak the Merge
process to incorporate this change
However, I would like to know if this is an optimal process to store data for these use case.
I am evaluating Hbase
and Hive ORC
along with possible change I can make to the merge process.
Any suggestion will be highly appreciated.
As far as I understand, you need to use full outer join between snapshot and journal(delta) and then use coalesce
, for instance:
def applyDeduplicatedJournal(snapshot: DataFrame, journal: DataFrame, joinColumnNames: Seq[String]): DataFrame = {
val joinExpr = joinColumnNames
.map(column => snapshot(column) === journal(column))
.reduceLeft(_ && _)
val isThereNoJournalRecord = joinColumnNames
.map(jCol => journal(jCol).isNull)
.reduceLeft(_ && _)
val selectClause = snapshot.columns
.map(col => when(isThereNoJournalRecord, snapshot(col)).otherwise(coalesce(journal(col), snapshot(col))) as col)
snapshot
.join(journal, joinExpr, "full_outer")
.select(selectClause: _*)
}
In this case you will merge snapshot with journal with fallback to snapshot value in case when journal has null value.
Hope it helps!