Search code examples
scaladate-range

Split overlapping ranges into all unique ranges


I am trying to split a dynamic number of ranges with associated attributes, so that whenever 2 or more ranges overlap, the overlapping section(s) are split into unique ranges with the combination of all overlapping attributes.

datasets are of type

case class testCC(bgn_ts: java.sql.Timestamp, end_ts: java.sql.Timestamp, supply1: Int, supply2: Int)

Now data dataset1 will always have supply2 as 0 and dataset2 will always have supply 1 as 0.

                         Set1                                    
bgn_ts            | end_ts            |  supply1   |supply2 |    
2020-12-13T11:45  | 2020-12-16T11:45  | 10         |  0     |    
2020-12-16T11:45  | 9999-12-31T00:00  | 11         |  0     |

                         Set2
bgn_ts            | end_ts            |  supply1   |supply2 | 
2020-12-15T11:45  | 9999-12-31T00:00  | 0          |  12    |

This should result in

bgn_ts            | end_ts            |  supply1   |supply2 |
2020-12-13T11:45  | 2020-12-15T11:45  |   10       |  0     |
2020-12-15T11:45  | 2020-12-16T11:45  |   10       |  12    |
2020-12-16T11:45  | 9999-12-31T00:00  |   11       |  12    |

Another example:

                         Set1                                    
bgn_ts            | end_ts            |  supply1   |supply2 |    
2020-12-13T11:45  | 2020-12-16T11:45  | 10         |  0     |    
2020-12-16T11:45  | 2020-12-17T11:45  | 12         |  0     |
2020-12-17T11:45  | 9999-12-31T00:00  | 20         |  0     |

                         Set2                                    
bgn_ts            | end_ts            |  supply1   |supply2 |    
2020-12-13T11:45  | 2020-12-15T11:45  | 0          | 10     |    
2020-12-15T11:45  | 2020-12-16T11:45  | 0          |  9     |
2020-12-16T11:45  | 2020-12-18T11:45  | 0          | 12     |
2020-12-18T11:45  | 9999-12-31T00:00  | 0          | 21     |

This should result in

bgn_ts            | end_ts            |  supply1   |supply2 |    
2020-12-13T11:45  | 2020-12-15T11:45  | 10         | 10     |    
2020-12-15T11:45  | 2020-12-16T11:45  | 10         |  9     |
2020-12-16T11:45  | 2020-12-17T11:45  | 12         | 12     |
2020-12-17T11:45  | 2020-12-18T11:45  | 20         | 12     |
2020-12-18T11:45  | 9999-12-31T00:00  | 20         | 21     |

Below is the code I tried:

implicit def ts_ordering: Ordering[Timestamp] = new Ordering[Timestamp] {
            override def compare(x: Timestamp, y: Timestamp): Int = x compareTo y
          }

set2.sortBy(_.end_ts)
.foldLeft(set1.sortBy(_.end_ts)) {
  case (result, set2_entry) =>
    result.span(!_.end_ts.after(set2_entry.bgn_ts)) match {
      case (before, remaining) =>
        val (overlap, after) = remaining.span(_.bgn_ts.before(set2_entry.end_ts))
        before ++ (
          overlap.flatMap {
            case o if o.bgn_ts.equals(set2_entry.bgn_ts) && o.end_ts.equals(set2_entry.end_ts) =>
              o.copy(supply2 = set2_entry.supply2) :: Nil
            case o if o.bgn_ts.before(set2_entry.bgn_ts) && o.end_ts.after(set2_entry.end_ts) =>
              // need to split into three here, set2 record is entirely within this record
              o.copy(end_ts = set2_entry.bgn_ts) ::
              o.copy(bgn_ts = set2_entry.bgn_ts, end_ts = set2_entry.end_ts, supply2 = set2_entry.supply2) ::
              o.copy(bgn_ts = set2_entry.end_ts) ::
              Nil
            case o if !o.bgn_ts.after(set2_entry.bgn_ts) =>
              // split into two here, set2 record overlaps after the beginning of this record
              o.copy(end_ts = set2_entry.bgn_ts) ::
              o.copy(bgn_ts = set2_entry.bgn_ts, supply2 = set2_entry.supply2) ::
              Nil
            case o =>
              // split into two here, set2 record overlaps before the end of this record
              o.copy(end_ts = set2_entry.end_ts, supply2 = set2_entry.supply2) ::
              o.copy(bgn_ts = set2_entry.end_ts) ::
              Nil
          }
        ) ++ after
    }
}

To make it easier for testing, I'm supplying the test data:

Example1
val set1 = List(
      testCC(
        bgn_ts = Timestamp.valueOf("2020-12-13 11:45:00"),
        end_ts = Timestamp.valueOf("2020-12-16 11:45:00"), supply2 = 0, supply1 = 10),
      testCC(
        bgn_ts = Timestamp.valueOf("2020-12-16 11:45:00"),
        end_ts = Timestamp.valueOf("9999-12-31 00:00:00"), supply2 = 0, supply1 = 11)
)

val set2 = List(
  testCC(
    bgn_ts = Timestamp.valueOf("2020-12-15 11:45:00"),
    end_ts = Timestamp.valueOf("9999-12-31 00:00:00"), supply1 = 0, supply2 = 12)
)

val expected1 = List(
      testCC(
        bgn_ts = Timestamp.valueOf("2020-12-13 11:45:00.0"),
        end_ts = Timestamp.valueOf("2020-12-15 11:45:00.0"), supply1 = 10, supply2 = 0),
      testCC(
        bgn_ts = Timestamp.valueOf("2020-12-15 11:45:00.0"),
        end_ts = Timestamp.valueOf("2020-12-16 11:45:00.0"), supply1 = 10, supply2 = 12),
      testCC(
        bgn_ts = Timestamp.valueOf("2020-12-16 11:45:00.0"),
        end_ts = Timestamp.valueOf("9999-12-31 00:00:00"), supply1 = 11, supply2 = 12)
    )


Example2
val set1 = List(
      testCC(
        bgn_ts = Timestamp.valueOf("2020-12-13 11:45:00"),
        end_ts = Timestamp.valueOf("2020-12-16 11:45:00"), supply2 = 0, supply1 = 10),
      testCC(
        bgn_ts = Timestamp.valueOf("2020-12-16 11:45:00"),
        end_ts = Timestamp.valueOf("2020-12-17 11:45:00"), supply2 = 0, supply1 = 12),
      testCC(
        bgn_ts = Timestamp.valueOf("2020-12-17 11:45:00"),
        end_ts = Timestamp.valueOf("9999-12-31 00:00:00"), supply2 = 0, supply1 = 20)
    )

val set2 = List(
  testCC(
    bgn_ts = Timestamp.valueOf("2020-12-13 11:45:00"),
    end_ts = Timestamp.valueOf("2020-12-15 11:45:00"), supply1 = 0, supply2 = 10),
  testCC(
    bgn_ts = Timestamp.valueOf("2020-12-15 11:45:00"),
    end_ts = Timestamp.valueOf("2020-12-16 11:45:00"), supply1 = 0, supply2 = 9),
  testCC(
    bgn_ts = Timestamp.valueOf("2020-12-16 11:45:00"),
    end_ts = Timestamp.valueOf("2020-12-18 11:45:00"), supply1 = 0, supply2 = 12),
  testCC(
    bgn_ts = Timestamp.valueOf("2020-12-18 11:45:00"),
    end_ts = Timestamp.valueOf("9999-12-31 00:00:00"), supply1 = 0, supply2 = 21)
)

val expected1 = List(
      testCC(
        bgn_ts = Timestamp.valueOf("2020-12-13 11:45:00.0"),
        end_ts = Timestamp.valueOf("2020-12-15 11:45:00.0"), supply1 = 10, supply2 = 10),
      testCC(
        bgn_ts = Timestamp.valueOf("2020-12-15 11:45:00.0"),
        end_ts = Timestamp.valueOf("2020-12-16 11:45:00.0"), supply1 = 10, supply2 = 9),
      testCC(
        bgn_ts = Timestamp.valueOf("2020-12-16 11:45:00.0"),
        end_ts = Timestamp.valueOf("2020-12-17 11:45:00.0"), supply1 = 12, supply2 = 12),
      testCC(
        bgn_ts = Timestamp.valueOf("2020-12-17 11:45:00.0"),
        end_ts = Timestamp.valueOf("2020-12-18 11:45:00.0"), supply1 = 20, supply2 = 12),
      testCC(
        bgn_ts = Timestamp.valueOf("2020-12-18 11:45:00.0"),
        end_ts = Timestamp.valueOf("9999-12-31 00:00:00"), supply1 = 20, supply2 = 21)
    )

Solution

  • This appears to work for the 2 test examples provided.

    import java.sql.Timestamp
    
    case class TestCC(bgn_ts: Timestamp, end_ts: Timestamp
                     ,supply1: Int, supply2: Int)
    
    def mergeRanges(lst1: List[TestCC]
                   ,lst2: List[TestCC]): List[TestCC] = {
      val lsts = lst1 ++ lst2
      val bMap = lsts.groupMapReduce(_.bgn_ts)(identity){
        case (a,b) => TestCC(a.bgn_ts, b.end_ts
                            ,a.supply1+b.supply1, a.supply2+b.supply2)
      }
      val eMap = lsts.groupMapReduce(_.end_ts)(identity){
        case (a,b) => TestCC(a.bgn_ts, b.end_ts
                            ,a.supply1+b.supply1, a.supply2+b.supply2)
      }
      List.unfold(bMap.keys.toList.sorted ->
                  eMap.keys.toList.sorted){
        case (Nil, Nil) => None
        case (Nil,_)|(_,Nil) => throw new Error("unexpected")
        case (b::bs, e::es) =>
          if (bs.nonEmpty && bs.head.before(e))
            Some(bMap(b).copy(end_ts = bs.head) -> (bs,e::es))
          else
            Some(TestCC(b, e
                       ,bMap(b).supply1 max eMap(e).supply1
                       ,bMap(b).supply2 max eMap(e).supply2) -> (bs,es))
      }
    }
    

    The code makes a few assumptions about the input data and I'm not sure how well it will handle malformed input, but give it a try and see if it gets you closer to your goal.


    2nd attempt

    After thinking it over I decided that a different approach was required.

    def mergeRanges(lst1: List[TestCC]
                   ,lst2: List[TestCC]): List[TestCC] = {
      val lsts = lst1 ++ lst2
      val s1Map = lsts.filter(_.supply1 > 0)
                      .sortBy(_.bgn_ts)
                      .foldLeft(Map.empty[Timestamp,Int]){
                        case (mp, TestCC(b,e,s1,_)) =>
                          mp + (b -> s1) + (e -> s1)
                      }.withDefaultValue(0)
      val s1Lst = s1Map.keys.toList.sorted.reverse
      val s2Map = lsts.filter(_.supply2 > 0)
                      .sortBy(_.bgn_ts)
                      .foldLeft(Map.empty[Timestamp,Int]){
                        case (mp, TestCC(b,e,_,s2)) =>
                          mp + (b -> s2) + (e -> s2)
                      }.withDefaultValue(0)
      val s2Lst = s2Map.keys.toList.sorted.reverse
      lsts.flatMap{case TestCC(b,e,_,_) => List(b,e)}
          .distinct.sorted.sliding(2).toList
          .map{case List(ts1, ts2) =>
            TestCC(ts1, ts2
              ,s1Lst.dropWhile(_.after(ts1)).headOption.fold(0)(s1Map)
              ,s2Lst.dropWhile(_.after(ts1)).headOption.fold(0)(s2Map))
          }
    }
    

    I believe this passes all current test examples.