Search code examples
python-polars

Find possible subsequence in a sequential time-series in polars dataframe


I will try to explain this as good as possible, because I am unfortunately quiet new to polars. I have a large time series dataset where each separate timeseries is identified with a group_id. Additionally, there is a time_idx column that identifies which of the possible time series step is present and have a corresponding target value if present. As a minimal example, consider the following:

pl.Config(fmt_table_cell_list_len=10, fmt_str_lengths=80)

min_df = pl.DataFrame({
    "group_idx": [0, 1, 2, 3], "time_idx": [[0, 1, 2, 3], [2, 3], [0, 2, 3], [0,3]]
})
shape: (4, 2)
┌───────────┬──────────────┐
│ group_idx ┆ time_idx     │
│ ---       ┆ ---          │
│ i64       ┆ list[i64]    │
╞═══════════╪══════════════╡
│ 0         ┆ [0, 1, 2, 3] │
│ 1         ┆ [2, 3]       │
│ 2         ┆ [0, 2, 3]    │
│ 3         ┆ [0, 3]       │
└───────────┴──────────────┘

Here, the time range in the dataset is 4 steps long, but not for all individual series all time steps are present. So while group_idx=0 has all present steps, group_idx=0 only has step 0 and 3, meaning that for step 1 and 2 no recorded target value is present.

Now, I would like to obtain all possible sub sequences so that we start from each possible time step for a given sequence length and maximally go to the max_time_step (in this case 3). For example, for sequence_length=3, the expected output would be:

result_df = pl.DataFrame({
    "group_idx": [0, 0, 1, 1, 2, 2, 3, 3], 
    "time_idx": [[0, 1, 2, 3], [0, 1, 2, 3], [2, 3], [2, 3], [0,2,3], [0,2,3], [0,3], [0,3]], 
    "sub_sequence": [[0,1,2], [1,2,3], [None, None, 2], [None, 2, 3], [0, None, 2], [None, 2, 3], [0, None, None], [None, None, 3]]
})
shape: (8, 3)
┌───────────┬──────────────┬─────────────────┐
│ group_idx ┆ time_idx     ┆ sub_sequence    │
│ ---       ┆ ---          ┆ ---             │
│ i64       ┆ list[i64]    ┆ list[i64]       │
╞═══════════╪══════════════╪═════════════════╡
│ 0         ┆ [0, 1, 2, 3] ┆ [0, 1, 2]       │
│ 0         ┆ [0, 1, 2, 3] ┆ [1, 2, 3]       │
│ 1         ┆ [2, 3]       ┆ [null, null, 2] │
│ 1         ┆ [2, 3]       ┆ [null, 2, 3]    │
│ 2         ┆ [0, 2, 3]    ┆ [0, null, 2]    │
│ 2         ┆ [0, 2, 3]    ┆ [null, 2, 3]    │
│ 3         ┆ [0, 3]       ┆ [0, null, null] │
│ 3         ┆ [0, 3]       ┆ [null, null, 3] │
└───────────┴──────────────┴─────────────────┘

All of this should be computed within polars, because the real dataset is much larger both in terms of the number of time series and time series length.

Edit: Based on the suggestion by @ΩΠΟΚΕΚΡΥΜΜΕΝΟΣ I have tried the following on the actual dataset (~200 million rows after .explode()). I forgot to say that we can assume that that group_idxand time_idx are already sorted. However, this gets killed.

(
 min_df.lazy()
     .with_columns(
         pl.col("time_idx").alias("time_idx_nulls")
     )
     .rolling(
          index_column='time_idx',
          group_by='group_idx',
          period=str(max_sequence_length) + 'i',
     )
     .agg(pl.col("time_idx_nulls"))
     .filter(pl.col('time_idx_nulls').list.len() == max_sequence_length)
)

Solution

  • Here's an algorithm that needs only the desired sub-sequence length as input. It uses rolling to create your sub-sequences.

    period = 3
    min_df = min_df.explode('time_idx')
    (
        min_df.get_column('group_idx').unique().to_frame()
        .join(
            min_df.get_column('time_idx').unique().to_frame(),
            how='cross'
        )
        .join(
            min_df.with_columns(pl.col('time_idx').alias('time_idx_nulls')),
            on=['group_idx', 'time_idx'],
            how='left',
        )
        .rolling(
            index_column='time_idx',
            group_by='group_idx',
            period=str(period) + 'i',
        )
        .agg(pl.col("time_idx_nulls"))
        .filter(pl.col('time_idx_nulls').list.len() == period)
        .sort('group_idx')
    )
    
    shape: (8, 3)
    ┌───────────┬──────────┬─────────────────┐
    │ group_idx ┆ time_idx ┆ time_idx_nulls  │
    │ ---       ┆ ---      ┆ ---             │
    │ i64       ┆ i64      ┆ list[i64]       │
    ╞═══════════╪══════════╪═════════════════╡
    │ 0         ┆ 2        ┆ [0, 1, 2]       │
    │ 0         ┆ 3        ┆ [1, 2, 3]       │
    │ 1         ┆ 2        ┆ [null, null, 2] │
    │ 1         ┆ 3        ┆ [null, 2, 3]    │
    │ 2         ┆ 2        ┆ [0, null, 2]    │
    │ 2         ┆ 3        ┆ [null, 2, 3]    │
    │ 3         ┆ 2        ┆ [0, null, null] │
    │ 3         ┆ 3        ┆ [null, null, 3] │
    └───────────┴──────────┴─────────────────┘
    

    And for example, with period = 2:

    shape: (12, 3)
    ┌───────────┬──────────┬────────────────┐
    │ group_idx ┆ time_idx ┆ time_idx_nulls │
    │ ---       ┆ ---      ┆ ---            │
    │ i64       ┆ i64      ┆ list[i64]      │
    ╞═══════════╪══════════╪════════════════╡
    │ 0         ┆ 1        ┆ [0, 1]         │
    │ 0         ┆ 2        ┆ [1, 2]         │
    │ 0         ┆ 3        ┆ [2, 3]         │
    │ 1         ┆ 1        ┆ [null, null]   │
    │ 1         ┆ 2        ┆ [null, 2]      │
    │ 1         ┆ 3        ┆ [2, 3]         │
    │ 2         ┆ 1        ┆ [0, null]      │
    │ 2         ┆ 2        ┆ [null, 2]      │
    │ 2         ┆ 3        ┆ [2, 3]         │
    │ 3         ┆ 1        ┆ [0, null]      │
    │ 3         ┆ 2        ┆ [null, null]   │
    │ 3         ┆ 3        ┆ [null, 3]      │
    └───────────┴──────────┴────────────────┘
    

    Edit: managing RAM requirements

    One way that we can manage RAM requirements (for this, or any other algorithm on large datasets) is to find ways to divide-and-conquer.

    If we look at our particular problem, each line in the input dataset leads to results that are independent of any other line. We can use this fact to apply our algorithm in batches.

    But first, let's create some data that leads to a large problem:

    min_time = 0
    max_time = 1_000
    nbr_groups = 400_000
    min_df = (
        pl.DataFrame({"time_idx": [(range(min_time, max_time, 2))]})
        .join(
            pl.int_range(nbr_groups, eager=True).alias("group_idx").to_frame(),
            how="cross"
        )
    )
    min_df.explode('time_idx')
    
    shape: (200000000, 2)
    ┌──────────┬───────────┐
    │ time_idx ┆ group_idx │
    │ ---      ┆ ---       │
    │ i64      ┆ i64       │
    ╞══════════╪═══════════╡
    │ 0        ┆ 0         │
    │ 2        ┆ 0         │
    │ 4        ┆ 0         │
    │ 6        ┆ 0         │
    │ ...      ┆ ...       │
    │ 992      ┆ 399999    │
    │ 994      ┆ 399999    │
    │ 996      ┆ 399999    │
    │ 998      ┆ 399999    │
    └──────────┴───────────┘
    

    The input dataset when exploded is 200 million records .. so roughly the size you describe. (Of course, using divide-and-conquer, we won't explode the full dataset.)

    To divide-and-conquer this, we'll slice our input dataset into smaller datasets, run the algorithm on the smaller datasets, and then concat the results into one large dataset. (One nice feature of slice is that it's very cheap - it's simply a window into the original dataset, so it consumes very little additional RAM.)

    Notice the slice_size variable. You'll need to experiment with this value on your particular computing platform. You want to set this as large as your RAM requirements allow. If set too low, your program will take too long. If set too high, your program will crash. (I've arbitrarily set this to 10,000 as a starting value.)

    time_index_df = (
        pl.int_range(min_time, max_time, eager=True, dtype=pl.Int64)
        .alias("time_idx")
        .to_frame()
        .lazy()
    )
    
    
    period = 3
    slice_size = 10_000
    result = pl.concat(
        [
            (
                time_index_df
                .join(
                    min_df
                    .lazy()
                    .slice(next_index, slice_size)
                    .select("group_idx"),
                    how="cross",
                )
                .join(
                    min_df
                    .lazy()
                    .slice(next_index, slice_size)
                    .explode('time_idx')
                    .with_columns(pl.col("time_idx").alias("time_idx_nulls")),
                    on=["group_idx", "time_idx"],
                    how="left",
                )
                .rolling(
                    index_column='time_idx',
                    group_by='group_idx',
                    period=str(period) + 'i',
                )
                .agg(pl.col("time_idx_nulls"))
                .filter(pl.col('time_idx_nulls').list.len() == period)
                .select('group_idx', 'time_idx_nulls')
                .collect()
            )
            for next_index in range(0, min_df.height, slice_size)
        ]
    )
    
    result.sort('group_idx')
    
    shape: (399200000, 2)
    ┌───────────┬───────────────────┐
    │ group_idx ┆ time_idx_nulls    │
    │ ---       ┆ ---               │
    │ i64       ┆ list[i64]         │
    ╞═══════════╪═══════════════════╡
    │ 0         ┆ [0, null, 2]      │
    │ 0         ┆ [null, 2, null]   │
    │ 0         ┆ [2, null, 4]      │
    │ 0         ┆ [null, 4, null]   │
    │ ...       ┆ ...               │
    │ 399999    ┆ [994, null, 996]  │
    │ 399999    ┆ [null, 996, null] │
    │ 399999    ┆ [996, null, 998]  │
    │ 399999    ┆ [null, 998, null] │
    └───────────┴───────────────────┘
    

    Some other things

    You do actually need to use the joins. The joins are used to fill in the "holes" in your sequences with null values.

    Also, notice that I've put each slice/batch into lazy mode, but not the entire algorithm. Depending on your computing platform, using lazy mode for the entire algorithm may again overwhelm your system, as Polars attempts to spread the work across multiple processors which could lead to more out-of-memory situations for you.

    Also note the humongous size of my output dataset: almost 400 million records. I did this purposely as a reminder that your output dataset may be the ultimate problem. That is, any algorithm would fail if the result dataset is larger than your RAM can hold.