I am trying to split a dynamic number of ranges with associated attributes, so that whenever 2 or more ranges overlap, the overlapping section(s) are split into unique ranges with the combination of all overlapping attributes.
datasets are of type
case class testCC(bgn_ts: java.sql.Timestamp, end_ts: java.sql.Timestamp, supply1: Int, supply2: Int)
Now data dataset1 will always have supply2 as 0 and dataset2 will always have supply 1 as 0.
Set1
bgn_ts | end_ts | supply1 |supply2 |
2020-12-13T11:45 | 2020-12-16T11:45 | 10 | 0 |
2020-12-16T11:45 | 9999-12-31T00:00 | 11 | 0 |
Set2
bgn_ts | end_ts | supply1 |supply2 |
2020-12-15T11:45 | 9999-12-31T00:00 | 0 | 12 |
This should result in
bgn_ts | end_ts | supply1 |supply2 |
2020-12-13T11:45 | 2020-12-15T11:45 | 10 | 0 |
2020-12-15T11:45 | 2020-12-16T11:45 | 10 | 12 |
2020-12-16T11:45 | 9999-12-31T00:00 | 11 | 12 |
Another example:
Set1
bgn_ts | end_ts | supply1 |supply2 |
2020-12-13T11:45 | 2020-12-16T11:45 | 10 | 0 |
2020-12-16T11:45 | 2020-12-17T11:45 | 12 | 0 |
2020-12-17T11:45 | 9999-12-31T00:00 | 20 | 0 |
Set2
bgn_ts | end_ts | supply1 |supply2 |
2020-12-13T11:45 | 2020-12-15T11:45 | 0 | 10 |
2020-12-15T11:45 | 2020-12-16T11:45 | 0 | 9 |
2020-12-16T11:45 | 2020-12-18T11:45 | 0 | 12 |
2020-12-18T11:45 | 9999-12-31T00:00 | 0 | 21 |
This should result in
bgn_ts | end_ts | supply1 |supply2 |
2020-12-13T11:45 | 2020-12-15T11:45 | 10 | 10 |
2020-12-15T11:45 | 2020-12-16T11:45 | 10 | 9 |
2020-12-16T11:45 | 2020-12-17T11:45 | 12 | 12 |
2020-12-17T11:45 | 2020-12-18T11:45 | 20 | 12 |
2020-12-18T11:45 | 9999-12-31T00:00 | 20 | 21 |
Below is the code I tried:
implicit def ts_ordering: Ordering[Timestamp] = new Ordering[Timestamp] {
override def compare(x: Timestamp, y: Timestamp): Int = x compareTo y
}
set2.sortBy(_.end_ts)
.foldLeft(set1.sortBy(_.end_ts)) {
case (result, set2_entry) =>
result.span(!_.end_ts.after(set2_entry.bgn_ts)) match {
case (before, remaining) =>
val (overlap, after) = remaining.span(_.bgn_ts.before(set2_entry.end_ts))
before ++ (
overlap.flatMap {
case o if o.bgn_ts.equals(set2_entry.bgn_ts) && o.end_ts.equals(set2_entry.end_ts) =>
o.copy(supply2 = set2_entry.supply2) :: Nil
case o if o.bgn_ts.before(set2_entry.bgn_ts) && o.end_ts.after(set2_entry.end_ts) =>
// need to split into three here, set2 record is entirely within this record
o.copy(end_ts = set2_entry.bgn_ts) ::
o.copy(bgn_ts = set2_entry.bgn_ts, end_ts = set2_entry.end_ts, supply2 = set2_entry.supply2) ::
o.copy(bgn_ts = set2_entry.end_ts) ::
Nil
case o if !o.bgn_ts.after(set2_entry.bgn_ts) =>
// split into two here, set2 record overlaps after the beginning of this record
o.copy(end_ts = set2_entry.bgn_ts) ::
o.copy(bgn_ts = set2_entry.bgn_ts, supply2 = set2_entry.supply2) ::
Nil
case o =>
// split into two here, set2 record overlaps before the end of this record
o.copy(end_ts = set2_entry.end_ts, supply2 = set2_entry.supply2) ::
o.copy(bgn_ts = set2_entry.end_ts) ::
Nil
}
) ++ after
}
}
To make it easier for testing, I'm supplying the test data:
Example1
val set1 = List(
testCC(
bgn_ts = Timestamp.valueOf("2020-12-13 11:45:00"),
end_ts = Timestamp.valueOf("2020-12-16 11:45:00"), supply2 = 0, supply1 = 10),
testCC(
bgn_ts = Timestamp.valueOf("2020-12-16 11:45:00"),
end_ts = Timestamp.valueOf("9999-12-31 00:00:00"), supply2 = 0, supply1 = 11)
)
val set2 = List(
testCC(
bgn_ts = Timestamp.valueOf("2020-12-15 11:45:00"),
end_ts = Timestamp.valueOf("9999-12-31 00:00:00"), supply1 = 0, supply2 = 12)
)
val expected1 = List(
testCC(
bgn_ts = Timestamp.valueOf("2020-12-13 11:45:00.0"),
end_ts = Timestamp.valueOf("2020-12-15 11:45:00.0"), supply1 = 10, supply2 = 0),
testCC(
bgn_ts = Timestamp.valueOf("2020-12-15 11:45:00.0"),
end_ts = Timestamp.valueOf("2020-12-16 11:45:00.0"), supply1 = 10, supply2 = 12),
testCC(
bgn_ts = Timestamp.valueOf("2020-12-16 11:45:00.0"),
end_ts = Timestamp.valueOf("9999-12-31 00:00:00"), supply1 = 11, supply2 = 12)
)
Example2
val set1 = List(
testCC(
bgn_ts = Timestamp.valueOf("2020-12-13 11:45:00"),
end_ts = Timestamp.valueOf("2020-12-16 11:45:00"), supply2 = 0, supply1 = 10),
testCC(
bgn_ts = Timestamp.valueOf("2020-12-16 11:45:00"),
end_ts = Timestamp.valueOf("2020-12-17 11:45:00"), supply2 = 0, supply1 = 12),
testCC(
bgn_ts = Timestamp.valueOf("2020-12-17 11:45:00"),
end_ts = Timestamp.valueOf("9999-12-31 00:00:00"), supply2 = 0, supply1 = 20)
)
val set2 = List(
testCC(
bgn_ts = Timestamp.valueOf("2020-12-13 11:45:00"),
end_ts = Timestamp.valueOf("2020-12-15 11:45:00"), supply1 = 0, supply2 = 10),
testCC(
bgn_ts = Timestamp.valueOf("2020-12-15 11:45:00"),
end_ts = Timestamp.valueOf("2020-12-16 11:45:00"), supply1 = 0, supply2 = 9),
testCC(
bgn_ts = Timestamp.valueOf("2020-12-16 11:45:00"),
end_ts = Timestamp.valueOf("2020-12-18 11:45:00"), supply1 = 0, supply2 = 12),
testCC(
bgn_ts = Timestamp.valueOf("2020-12-18 11:45:00"),
end_ts = Timestamp.valueOf("9999-12-31 00:00:00"), supply1 = 0, supply2 = 21)
)
val expected1 = List(
testCC(
bgn_ts = Timestamp.valueOf("2020-12-13 11:45:00.0"),
end_ts = Timestamp.valueOf("2020-12-15 11:45:00.0"), supply1 = 10, supply2 = 10),
testCC(
bgn_ts = Timestamp.valueOf("2020-12-15 11:45:00.0"),
end_ts = Timestamp.valueOf("2020-12-16 11:45:00.0"), supply1 = 10, supply2 = 9),
testCC(
bgn_ts = Timestamp.valueOf("2020-12-16 11:45:00.0"),
end_ts = Timestamp.valueOf("2020-12-17 11:45:00.0"), supply1 = 12, supply2 = 12),
testCC(
bgn_ts = Timestamp.valueOf("2020-12-17 11:45:00.0"),
end_ts = Timestamp.valueOf("2020-12-18 11:45:00.0"), supply1 = 20, supply2 = 12),
testCC(
bgn_ts = Timestamp.valueOf("2020-12-18 11:45:00.0"),
end_ts = Timestamp.valueOf("9999-12-31 00:00:00"), supply1 = 20, supply2 = 21)
)
This appears to work for the 2 test examples provided.
import java.sql.Timestamp
case class TestCC(bgn_ts: Timestamp, end_ts: Timestamp
,supply1: Int, supply2: Int)
def mergeRanges(lst1: List[TestCC]
,lst2: List[TestCC]): List[TestCC] = {
val lsts = lst1 ++ lst2
val bMap = lsts.groupMapReduce(_.bgn_ts)(identity){
case (a,b) => TestCC(a.bgn_ts, b.end_ts
,a.supply1+b.supply1, a.supply2+b.supply2)
}
val eMap = lsts.groupMapReduce(_.end_ts)(identity){
case (a,b) => TestCC(a.bgn_ts, b.end_ts
,a.supply1+b.supply1, a.supply2+b.supply2)
}
List.unfold(bMap.keys.toList.sorted ->
eMap.keys.toList.sorted){
case (Nil, Nil) => None
case (Nil,_)|(_,Nil) => throw new Error("unexpected")
case (b::bs, e::es) =>
if (bs.nonEmpty && bs.head.before(e))
Some(bMap(b).copy(end_ts = bs.head) -> (bs,e::es))
else
Some(TestCC(b, e
,bMap(b).supply1 max eMap(e).supply1
,bMap(b).supply2 max eMap(e).supply2) -> (bs,es))
}
}
The code makes a few assumptions about the input data and I'm not sure how well it will handle malformed input, but give it a try and see if it gets you closer to your goal.
After thinking it over I decided that a different approach was required.
def mergeRanges(lst1: List[TestCC]
,lst2: List[TestCC]): List[TestCC] = {
val lsts = lst1 ++ lst2
val s1Map = lsts.filter(_.supply1 > 0)
.sortBy(_.bgn_ts)
.foldLeft(Map.empty[Timestamp,Int]){
case (mp, TestCC(b,e,s1,_)) =>
mp + (b -> s1) + (e -> s1)
}.withDefaultValue(0)
val s1Lst = s1Map.keys.toList.sorted.reverse
val s2Map = lsts.filter(_.supply2 > 0)
.sortBy(_.bgn_ts)
.foldLeft(Map.empty[Timestamp,Int]){
case (mp, TestCC(b,e,_,s2)) =>
mp + (b -> s2) + (e -> s2)
}.withDefaultValue(0)
val s2Lst = s2Map.keys.toList.sorted.reverse
lsts.flatMap{case TestCC(b,e,_,_) => List(b,e)}
.distinct.sorted.sliding(2).toList
.map{case List(ts1, ts2) =>
TestCC(ts1, ts2
,s1Lst.dropWhile(_.after(ts1)).headOption.fold(0)(s1Map)
,s2Lst.dropWhile(_.after(ts1)).headOption.fold(0)(s2Map))
}
}
I believe this passes all current test examples.